Skip to content

Commit

Permalink
Add more debug logs to see why producerPool doesn't get added
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 31, 2024
1 parent c406077 commit a44f0b6
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class PscWriter<IN>
checkNotNull(recoveredStates, "recoveredStates"), lastCheckpointId + 1);
this.currentProducer = getTransactionalProducer(lastCheckpointId + 1);
this.currentProducer.beginTransaction();
LOG.info("producerPool.size(): " + producerPool.size());
} else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE
|| deliveryGuarantee == DeliveryGuarantee.NONE) {
this.currentProducer = new FlinkPscInternalProducer<>(this.pscProducerConfig, null);
Expand Down Expand Up @@ -252,7 +253,7 @@ public List<PscWriterState> snapshotState(long checkpointId) throws IOException
@Override
public void close() throws Exception {
closed = true;
LOG.debug("Closing writer with {}", currentProducer);
LOG.info("Closing writer with {}", currentProducer);
closeAll(
this::abortCurrentProducer,
closer,
Expand Down Expand Up @@ -344,6 +345,8 @@ private FlinkPscInternalProducer<byte[], byte[]> getTransactionalProducer(long c

private FlinkPscInternalProducer<byte[], byte[]> getOrCreateTransactionalProducer(
String transactionalId) {
LOG.info("producerPool.size() in getOrCreateTransactionalProducer: " + producerPool.size());
LOG.info("producerPool: " + producerPool);
FlinkPscInternalProducer<byte[], byte[]> producer = producerPool.poll();
try {
if (producer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ private int abortTransactionOfSubtask(String prefix, long startCheckpointId, int

public void close() {
LOG.info("Closing transaction aborter");
Thread.dumpStack();
if (producer != null) {
LOG.info("CloseAction: {}", closeAction);
closeAction.accept(producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ public Map<TopicUriPartition, MessageId> getMessageIdByTimestamp(Map<TopicUriPar
*/
public Map<MetricName, Metric> metrics() throws ClientException {
ensureOpen();
Map<MetricName, Metric> metrics = new HashMap<>();
Map<MetricName, Metric> metrics = new ConcurrentHashMap<>();
for (PscBackendConsumer<K, V> backendConsumer : backendConsumers) {
metrics.putAll(backendConsumer.metrics());
}
Expand Down

0 comments on commit a44f0b6

Please sign in to comment.