diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetrics.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetrics.java index 4a1e8d4..3d2c6d8 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetrics.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/metrics/PscSourceReaderMetrics.java @@ -239,7 +239,7 @@ public void removeRecordsLagMetric(TopicUriPartition tp) { * bytes-consumed-total and count it towards the counter. */ public void updateNumBytesInCounter() { - if (this.bytesConsumedTotalMetric != null) { + if (this.bytesConsumedTotalMetric != null && this.bytesConsumedTotalMetric.metricValue() != null) { long bytesConsumedUntilNow = ((Number) this.bytesConsumedTotalMetric.metricValue()).longValue(); long bytesConsumedSinceLastUpdate = bytesConsumedUntilNow - latestBytesConsumedTotal;