diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 8483791e2e79b..ec4f8e5e5db14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -234,9 +234,11 @@ private static Sensor createTaskAndNodeLatencyAndThroughputSensors(final String final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent); addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + return sensor; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 8ec2711e764cc..dd6cc4a218573 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -115,11 +115,9 @@ public final Sensor taskLevelSensor(final String taskName, public final void removeAllTaskLevelSensors(final String taskName) { final String key = taskSensorPrefix(taskName); synchronized (taskLevelSensors) { - if (taskLevelSensors.containsKey(key)) { - while (!taskLevelSensors.get(key).isEmpty()) { - metrics.removeSensor(taskLevelSensors.get(key).pop()); - } - taskLevelSensors.remove(key); + final Deque sensors = taskLevelSensors.remove(key); + while (sensors != null && !sensors.isEmpty()) { + metrics.removeSensor(sensors.pop()); } } } @@ -152,10 +150,9 @@ public Sensor nodeLevelSensor(final String taskName, public final void removeAllNodeLevelSensors(final String taskName, final String processorNodeName) { final String key = nodeSensorPrefix(taskName, processorNodeName); synchronized (nodeLevelSensors) { - if (nodeLevelSensors.containsKey(key)) { - while (!nodeLevelSensors.get(key).isEmpty()) { - metrics.removeSensor(nodeLevelSensors.get(key).pop()); - } + final Deque sensors = nodeLevelSensors.remove(key); + while (sensors != null && !sensors.isEmpty()) { + metrics.removeSensor(sensors.pop()); } } } @@ -188,11 +185,9 @@ public final Sensor cacheLevelSensor(final String taskName, public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) { final String key = cacheSensorPrefix(taskName, cacheName); synchronized (cacheLevelSensors) { - if (cacheLevelSensors.containsKey(key)) { - while (!cacheLevelSensors.get(key).isEmpty()) { - metrics.removeSensor(cacheLevelSensors.get(key).pop()); - } - cacheLevelSensors.remove(key); + final Deque strings = cacheLevelSensors.remove(key); + while (strings != null && !strings.isEmpty()) { + metrics.removeSensor(strings.pop()); } } } @@ -225,11 +220,9 @@ public final Sensor storeLevelSensor(final String taskName, public final void removeAllStoreLevelSensors(final String taskName, final String storeName) { final String key = storeSensorPrefix(taskName, storeName); synchronized (storeLevelSensors) { - if (storeLevelSensors.containsKey(key)) { - while (!storeLevelSensors.get(key).isEmpty()) { - metrics.removeSensor(storeLevelSensors.get(key).pop()); - } - storeLevelSensors.remove(key); + final Deque sensors = storeLevelSensors.remove(key); + while (sensors != null && !sensors.isEmpty()) { + metrics.removeSensor(sensors.pop()); } } } @@ -413,12 +406,19 @@ public void removeSensor(final Sensor sensor) { Objects.requireNonNull(sensor, "Sensor is null"); metrics.removeSensor(sensor.name()); - final Sensor parent = parentSensors.get(sensor); + final Sensor parent = parentSensors.remove(sensor); if (parent != null) { metrics.removeSensor(parent.name()); } } + /** + * Visible for testing + */ + Map parentSensors() { + return Collections.unmodifiableMap(parentSensors); + } + private static String groupNameFromScope(final String scopeName) { return "stream-" + scopeName + "-metrics"; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java similarity index 59% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 7ce27b4b6d668..cadfdb0cef12b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.processor.internals; +package org.apache.kafka.streams.processor.internals.metrics; import org.apache.kafka.common.MetricName; @@ -23,12 +23,21 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.junit.Test; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; public class StreamsMetricsImplTest { @@ -62,6 +71,60 @@ public void testRemoveSensor() { final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG); streamsMetrics.removeSensor(sensor3); + + assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors()); + } + + @Test + public void testMutiLevelSensorRemoval() { + final Metrics registry = new Metrics(); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, ""); + for (final MetricName defaultMetric : registry.metrics().keySet()) { + registry.removeMetric(defaultMetric); + } + + final String taskName = "taskName"; + final String operation = "operation"; + final Map taskTags = mkMap(mkEntry("tkey", "value")); + + final String processorNodeName = "processorNodeName"; + final Map nodeTags = mkMap(mkEntry("nkey", "value")); + + final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); + addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + + final int numberOfTaskMetrics = registry.metrics().size(); + + final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1); + addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + + assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); + + metrics.removeAllNodeLevelSensors(taskName, processorNodeName); + + assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); + + final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); + addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + + assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); + + final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2); + addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + + assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); + + metrics.removeAllNodeLevelSensors(taskName, processorNodeName); + + assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); + + metrics.removeAllTaskLevelSensors(taskName); + + assertThat(registry.metrics().size(), equalTo(0)); } @Test @@ -115,21 +178,21 @@ public void testTotalMetricDoesntDecrease() { final String operation = "op"; final Sensor sensor = streamsMetrics.addLatencyAndThroughputSensor( - scope, - entity, - operation, - Sensor.RecordingLevel.INFO + scope, + entity, + operation, + Sensor.RecordingLevel.INFO ); final double latency = 100.0; final MetricName totalMetricName = metrics.metricName( - "op-total", - "stream-scope-metrics", - "", - "client-id", - "", - "scope-id", - "entity" + "op-total", + "stream-scope-metrics", + "", + "client-id", + "", + "scope-id", + "entity" ); final KafkaMetric totalMetric = metrics.metric(totalMetricName);