diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java index 0cff978..08d1648 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java @@ -182,6 +182,7 @@ class PscWriter checkNotNull(recoveredStates, "recoveredStates"), lastCheckpointId + 1); this.currentProducer = getTransactionalProducer(lastCheckpointId + 1); this.currentProducer.beginTransaction(); + LOG.info("producerPool.size(): " + producerPool.size()); } else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE || deliveryGuarantee == DeliveryGuarantee.NONE) { this.currentProducer = new FlinkPscInternalProducer<>(this.pscProducerConfig, null); @@ -252,7 +253,7 @@ public List snapshotState(long checkpointId) throws IOException @Override public void close() throws Exception { closed = true; - LOG.debug("Closing writer with {}", currentProducer); + LOG.info("Closing writer with {}", currentProducer); closeAll( this::abortCurrentProducer, closer, @@ -344,6 +345,8 @@ private FlinkPscInternalProducer getTransactionalProducer(long c private FlinkPscInternalProducer getOrCreateTransactionalProducer( String transactionalId) { + LOG.info("producerPool.size() in getOrCreateTransactionalProducer: " + producerPool.size()); + LOG.info("producerPool: " + producerPool); FlinkPscInternalProducer producer = producerPool.poll(); try { if (producer == null) { diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java index 266ec5f..aa506a7 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java @@ -132,6 +132,7 @@ private int abortTransactionOfSubtask(String prefix, long startCheckpointId, int public void close() { LOG.info("Closing transaction aborter"); + Thread.dumpStack(); if (producer != null) { LOG.info("CloseAction: {}", closeAction); closeAction.accept(producer); diff --git a/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java index 0b0b584..d781d92 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/PscConsumer.java @@ -1844,7 +1844,7 @@ public Map getMessageIdByTimestamp(Map metrics() throws ClientException { ensureOpen(); - Map metrics = new HashMap<>(); + Map metrics = new ConcurrentHashMap<>(); for (PscBackendConsumer backendConsumer : backendConsumers) { metrics.putAll(backendConsumer.metrics()); }