From 8c9ebbed98066ed86f5f028c84f0fbdda51c03ad Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 30 Oct 2024 15:48:56 -0400 Subject: [PATCH] Add NPE check for updateNumBytesInCounter --- .../connector/psc/source/metrics/PscSourceReaderMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetrics.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetrics.java index 4a1e8d4..3d2c6d8 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetrics.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetrics.java @@ -239,7 +239,7 @@ public void removeRecordsLagMetric(TopicUriPartition tp) { * bytes-consumed-total and count it towards the counter. */ public void updateNumBytesInCounter() { - if (this.bytesConsumedTotalMetric != null) { + if (this.bytesConsumedTotalMetric != null && this.bytesConsumedTotalMetric.metricValue() != null) { long bytesConsumedUntilNow = ((Number) this.bytesConsumedTotalMetric.metricValue()).longValue(); long bytesConsumedSinceLastUpdate = bytesConsumedUntilNow - latestBytesConsumedTotal;