Skip to content

Commit

Permalink
KAFKA-7660: fix parentSensors memory leak (apache#5953)
Browse files Browse the repository at this point in the history
In StreamsMetricsImpl, the parentSensors map was keeping references to Sensors after the sensors themselves had been removed.

Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
vvcephei authored and guozhangwang committed Nov 29, 2018
1 parent 4fb5520 commit bfbc32d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,11 @@ private static Sensor createTaskAndNodeLatencyAndThroughputSensors(final String
final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);

final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);

return sensor;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,9 @@ public final Sensor taskLevelSensor(final String taskName,
public final void removeAllTaskLevelSensors(final String taskName) {
final String key = taskSensorPrefix(taskName);
synchronized (taskLevelSensors) {
if (taskLevelSensors.containsKey(key)) {
while (!taskLevelSensors.get(key).isEmpty()) {
metrics.removeSensor(taskLevelSensors.get(key).pop());
}
taskLevelSensors.remove(key);
final Deque<String> sensors = taskLevelSensors.remove(key);
while (sensors != null && !sensors.isEmpty()) {
metrics.removeSensor(sensors.pop());
}
}
}
Expand Down Expand Up @@ -152,10 +150,9 @@ public Sensor nodeLevelSensor(final String taskName,
public final void removeAllNodeLevelSensors(final String taskName, final String processorNodeName) {
final String key = nodeSensorPrefix(taskName, processorNodeName);
synchronized (nodeLevelSensors) {
if (nodeLevelSensors.containsKey(key)) {
while (!nodeLevelSensors.get(key).isEmpty()) {
metrics.removeSensor(nodeLevelSensors.get(key).pop());
}
final Deque<String> sensors = nodeLevelSensors.remove(key);
while (sensors != null && !sensors.isEmpty()) {
metrics.removeSensor(sensors.pop());
}
}
}
Expand Down Expand Up @@ -188,11 +185,9 @@ public final Sensor cacheLevelSensor(final String taskName,
public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
final String key = cacheSensorPrefix(taskName, cacheName);
synchronized (cacheLevelSensors) {
if (cacheLevelSensors.containsKey(key)) {
while (!cacheLevelSensors.get(key).isEmpty()) {
metrics.removeSensor(cacheLevelSensors.get(key).pop());
}
cacheLevelSensors.remove(key);
final Deque<String> strings = cacheLevelSensors.remove(key);
while (strings != null && !strings.isEmpty()) {
metrics.removeSensor(strings.pop());
}
}
}
Expand Down Expand Up @@ -225,11 +220,9 @@ public final Sensor storeLevelSensor(final String taskName,
public final void removeAllStoreLevelSensors(final String taskName, final String storeName) {
final String key = storeSensorPrefix(taskName, storeName);
synchronized (storeLevelSensors) {
if (storeLevelSensors.containsKey(key)) {
while (!storeLevelSensors.get(key).isEmpty()) {
metrics.removeSensor(storeLevelSensors.get(key).pop());
}
storeLevelSensors.remove(key);
final Deque<String> sensors = storeLevelSensors.remove(key);
while (sensors != null && !sensors.isEmpty()) {
metrics.removeSensor(sensors.pop());
}
}
}
Expand Down Expand Up @@ -413,12 +406,19 @@ public void removeSensor(final Sensor sensor) {
Objects.requireNonNull(sensor, "Sensor is null");
metrics.removeSensor(sensor.name());

final Sensor parent = parentSensors.get(sensor);
final Sensor parent = parentSensors.remove(sensor);
if (parent != null) {
metrics.removeSensor(parent.name());
}
}

/**
* Visible for testing
*/
Map<Sensor, Sensor> parentSensors() {
return Collections.unmodifiableMap(parentSensors);
}

private static String groupNameFromScope(final String scopeName) {
return "stream-" + scopeName + "-metrics";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
package org.apache.kafka.streams.processor.internals.metrics;


import org.apache.kafka.common.MetricName;
Expand All @@ -23,12 +23,21 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

public class StreamsMetricsImplTest {

Expand Down Expand Up @@ -62,6 +71,60 @@ public void testRemoveSensor() {

final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor3);

assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors());
}

@Test
public void testMutiLevelSensorRemoval() {
final Metrics registry = new Metrics();
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, "");
for (final MetricName defaultMetric : registry.metrics().keySet()) {
registry.removeMetric(defaultMetric);
}

final String taskName = "taskName";
final String operation = "operation";
final Map<String, String> taskTags = mkMap(mkEntry("tkey", "value"));

final String processorNodeName = "processorNodeName";
final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));

final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);

final int numberOfTaskMetrics = registry.metrics().size();

final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);

assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));

metrics.removeAllNodeLevelSensors(taskName, processorNodeName);

assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));

final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);

assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));

final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);

assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));

metrics.removeAllNodeLevelSensors(taskName, processorNodeName);

assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));

metrics.removeAllTaskLevelSensors(taskName);

assertThat(registry.metrics().size(), equalTo(0));
}

@Test
Expand Down Expand Up @@ -115,21 +178,21 @@ public void testTotalMetricDoesntDecrease() {
final String operation = "op";

final Sensor sensor = streamsMetrics.addLatencyAndThroughputSensor(
scope,
entity,
operation,
Sensor.RecordingLevel.INFO
scope,
entity,
operation,
Sensor.RecordingLevel.INFO
);

final double latency = 100.0;
final MetricName totalMetricName = metrics.metricName(
"op-total",
"stream-scope-metrics",
"",
"client-id",
"",
"scope-id",
"entity"
"op-total",
"stream-scope-metrics",
"",
"client-id",
"",
"scope-id",
"entity"
);

final KafkaMetric totalMetric = metrics.metric(totalMetricName);
Expand Down

0 comments on commit bfbc32d

Please sign in to comment.