Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low friction cleanups (suggested by IntelliJ) #361

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
17 changes: 3 additions & 14 deletions src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.linkedin.xinfra.monitor.services.ServiceFactory;
import java.io.BufferedReader;
import java.io.FileReader;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -60,11 +59,11 @@ public class XinfraMonitor {
*/

@SuppressWarnings({"rawtypes"})
public XinfraMonitor(Map<String, Map> allClusterProps) throws Exception {
public XinfraMonitor(Map<String, Map<String, Object>> allClusterProps) throws Exception {
_apps = new ConcurrentHashMap<>();
_services = new ConcurrentHashMap<>();

for (Map.Entry<String, Map> clusterProperty : allClusterProps.entrySet()) {
for (Map.Entry<String, Map<String, Object>> clusterProperty : allClusterProps.entrySet()) {
String clusterName = clusterProperty.getKey();
Map props = clusterProperty.getValue();
if (!props.containsKey(XinfraMonitorConstants.CLASS_NAME_CONFIG))
Expand Down Expand Up @@ -94,15 +93,6 @@ public XinfraMonitor(Map<String, Map> allClusterProps) throws Exception {
(config, now) -> _offlineRunnables.size());
}

private boolean constructorContainsClass(Constructor<?>[] constructors, Class<?> classObject) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was never called

for (int n = 0; n < constructors[0].getParameterTypes().length; ++n) {
if (constructors[0].getParameterTypes()[n].equals(classObject)) {
return true;
}
}
return false;
}

public synchronized void start() throws Exception {
if (!_isRunning.compareAndSet(false, true)) {
return;
Expand Down Expand Up @@ -165,7 +155,6 @@ public void awaitShutdown() {
service.awaitShutdown(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}

@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception {
if (args.length <= 0) {
LOG.info("USAGE: java [options] " + XinfraMonitor.class.getName() + " config/xinfra-monitor.properties");
Expand All @@ -182,7 +171,7 @@ public static void main(String[] args) throws Exception {
}

@SuppressWarnings("unchecked")
Map<String, Map> props = new ObjectMapper().readValue(buffer.toString(), Map.class);
Map<String, Map<String, Object>> props = new ObjectMapper().readValue(buffer.toString(), Map.class);
XinfraMonitor xinfraMonitor = new XinfraMonitor(props);
xinfraMonitor.start();
LOG.info("Xinfra Monitor has started.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
public class XinfraMonitorConstants {

public XinfraMonitorConstants() {
samba2 marked this conversation as resolved.
Show resolved Hide resolved

}
private XinfraMonitorConstants() {}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utility class


public static final String TAGS_NAME = "name";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.linkedin.xinfra.monitor.services.configs.ProduceServiceConfig;
import com.linkedin.xinfra.monitor.services.configs.TopicManagementServiceConfig;
import com.linkedin.xinfra.monitor.services.metrics.ClusterTopicManipulationMetrics;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -39,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.xinfra.monitor.common.Utils.delay;
import static com.linkedin.xinfra.monitor.common.Utils.prettyPrint;

/*
Expand Down Expand Up @@ -94,14 +97,9 @@ public void start() throws Exception {
_topicManagementService.start();
CompletableFuture<Void> topicPartitionResult = _topicManagementService.topicPartitionResult();

try {
/* Delay 2 second to reduce the chance that produce and consumer thread has race condition
with TopicManagementService and MultiClusterTopicManagementService */
long threadSleepMs = TimeUnit.SECONDS.toMillis(2);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to concisely use the Utils.delay(...) method. Here, I additionally had to import delay statically since a Kafka Utils class is already used. (avoiding fully qualified import)

Thread.sleep(threadSleepMs);
} catch (InterruptedException e) {
throw new Exception("Interrupted while sleeping the thread", e);
}
delay(Duration.ofSeconds(2));
CompletableFuture<Void> topicPartitionFuture = topicPartitionResult.thenRun(() -> {
for (Service service : _allServices) {
if (!service.isRunning()) {
Expand Down
6 changes: 1 addition & 5 deletions src/main/java/com/linkedin/xinfra/monitor/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@
*/
public class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final int ZK_CONNECTION_TIMEOUT_MS = 30_000;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

public static final int ZK_SESSION_TIMEOUT_MS = 30_000;
private static final long LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS = 60000L;
private static final int LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS = 3;
private static final String LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS_CONFIG = "list.partition.reassignment.timeout.ms";
Expand Down Expand Up @@ -117,9 +115,7 @@ public static List<Integer> replicaIdentifiers(Set<BrokerMetadata> brokers) {
Collections.shuffle(brokerMetadataList);

// Get broker ids for replica list
List<Integer> replicaList = brokerMetadataList.stream().map(m -> m.id()).collect(Collectors.toList());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline


return replicaList;
return brokerMetadataList.stream().map(m -> m.id()).collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void commitAsync(OffsetCommitCallback callback) {

@Override
public OffsetAndMetadata committed(TopicPartition tp) {
return _consumer.committed(tp);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deprecated method

return _consumer.committed(Collections.singleton(tp)).get(tp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public class ClusterTopicManipulationService implements Service {

private final ClusterTopicManipulationMetrics _clusterTopicManipulationMetrics;
private final TopicFactory _topicFactory;
private final String _zkConnect;

public ClusterTopicManipulationService(String name, AdminClient adminClient, Map<String, Object> props)
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException,
Expand Down Expand Up @@ -96,7 +95,6 @@ public ClusterTopicManipulationService(String name, AdminClient adminClient, Map
TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) : new HashMap();

_clusterTopicManipulationMetrics = new ClusterTopicManipulationMetrics(metrics, tags);
_zkConnect = config.getString(TopicManagementServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

_topicFactory =
(TopicFactory) Class.forName(topicFactoryClassName).getConstructor(Map.class).newInstance(topicFactoryConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -117,9 +115,9 @@ public ConsumeService(String name,
topicPartitionFuture.get();
}

private void consume() throws Exception {
private void consume() {
/* Delay 1 second to reduce the chance that consumer creates topic before TopicManagementService */
Thread.sleep(1000);
Utils.delay(Duration.ofSeconds(1));

Map<Integer, Long> nextIndexes = new HashMap<>();

Expand All @@ -132,7 +130,7 @@ record = _baseConsumer.receive();
LOG.warn(_name + "/ConsumeService failed to receive record", e);
/* Avoid busy while loop */
//noinspection BusyWait
Thread.sleep(CONSUME_THREAD_SLEEP_MS);
Utils.delay(Duration.ofMillis(CONSUME_THREAD_SLEEP_MS));
continue;
}

Expand All @@ -152,16 +150,13 @@ record = _baseConsumer.receive();
int partition = record.partition();
/* Commit availability and commit latency service */
/* Call commitAsync, wait for a NON-NULL return value (see https://issues.apache.org/jira/browse/KAFKA-6183) */
OffsetCommitCallback commitCallback = new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap, Exception kafkaException) {
if (kafkaException != null) {
LOG.error("Exception while trying to perform an asynchronous commit.", kafkaException);
_commitAvailabilityMetrics._failedCommitOffsets.record();
} else {
_commitAvailabilityMetrics._offsetsCommitted.record();
_commitLatencyMetrics.recordCommitComplete();
}
OffsetCommitCallback commitCallback = (topicPartitionOffsetAndMetadataMap, kafkaException) -> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using lambdas instead of anonymous class

if (kafkaException != null) {
LOG.error("Exception while trying to perform an asynchronous commit.", kafkaException);
_commitAvailabilityMetrics._failedCommitOffsets.record();
} else {
_commitAvailabilityMetrics._offsetsCommitted.record();
_commitLatencyMetrics.recordCommitComplete();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ private void reportMetrics() {
builder.append("\n");
}
}
LOG.info("{}\n{}", LOG_DIVIDER, builder.toString());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obsolete call

LOG.info("{}\n{}", LOG_DIVIDER, builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,14 @@ public GraphiteMetricsReporterService(Map<String, Object> props, String name)

@Override
public synchronized void start() {
_executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
GraphiteMetricsReporterService.this.reportMetrics();
} catch (Exception e) {
LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics",
e);
}
_executor.scheduleAtFixedRate(() -> {
try {
GraphiteMetricsReporterService.this.reportMetrics();
} catch (Exception e) {
LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics",
e);
}
}, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS
}, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS
);
LOG.info("{}/GraphiteMetricsReporterService started", _name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,7 @@ public class OffsetCommitService implements Service {
_consumerNetworkClient = new ConsumerNetworkClient(logContext, kafkaClient, metadata, _time, retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), heartbeatIntervalMs);

ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, serviceName + SERVICE_SUFFIX);
}
};
ThreadFactory threadFactory = runnable -> new Thread(runnable, serviceName + SERVICE_SUFFIX);
_scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);

LOGGER.info("OffsetCommitService's ConsumerConfig - {}", Utils.prettyPrint(config.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ private Properties prepareConfigs(Map<String, Object> props) {

Map<String, String> customProps = (Map<String, String>) props.get(CommonServiceConfig.CONSUMER_PROPS_CONFIG);
if (customProps != null) {
for (Map.Entry<String, String> entry : customProps.entrySet()) {
consumerProps.put(entry.getKey(), entry.getValue());
}
consumerProps.putAll(customProps);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never would have spotted this myself - putAll is a convenient shortcut + also more performant (I know, not relevant here)

}

return consumerProps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public SignalFxMetricsReporterService(Map<String, Object> props, String name) th

_executor = Executors.newSingleThreadScheduledExecutor();
_metricRegistry = new MetricRegistry();
_metricMap = new HashMap<String, SettableDoubleGauge>();
_dimensionsMap = new HashMap<String, String>();
_metricMap = new HashMap<>();
_dimensionsMap = new HashMap<>();
Comment on lines +55 to +56
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was redundant

if (props.containsKey(SignalFxMetricsReporterServiceConfig.SIGNALFX_METRIC_DIMENSION)) {
_dimensionsMap = (Map<String, String>) props.get(SignalFxMetricsReporterServiceConfig.SIGNALFX_METRIC_DIMENSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Map;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
Expand Down Expand Up @@ -59,27 +58,24 @@ public OffsetCommitServiceMetrics(final Metrics metrics, final Map<String, Strin
"The total count of group coordinator unsuccessfully receiving consumer offset commit requests.", tags),
new CumulativeSum());

Measurable measurable = new Measurable() {
@Override
public double measure(MetricConfig config, long now) {
double offsetCommitSuccessRate = (double) metrics.metrics()
.get(metrics.metricName(SUCCESS_RATE_METRIC, METRIC_GROUP_NAME, tags))
.metricValue();
double offsetCommitFailureRate = (double) metrics.metrics()
.get(metrics.metricName(FAILURE_RATE_METRIC, METRIC_GROUP_NAME, tags))
.metricValue();

if (new Double(offsetCommitSuccessRate).isNaN()) {
offsetCommitSuccessRate = 0;
}

if (new Double(offsetCommitFailureRate).isNaN()) {
offsetCommitFailureRate = 0;
}

return offsetCommitSuccessRate + offsetCommitFailureRate > 0 ? offsetCommitSuccessRate / (
offsetCommitSuccessRate + offsetCommitFailureRate) : 0;
Measurable measurable = (config, now) -> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using lambda

double offsetCommitSuccessRate = (double) metrics.metrics()
.get(metrics.metricName(SUCCESS_RATE_METRIC, METRIC_GROUP_NAME, tags))
.metricValue();
double offsetCommitFailureRate = (double) metrics.metrics()
.get(metrics.metricName(FAILURE_RATE_METRIC, METRIC_GROUP_NAME, tags))
.metricValue();

if (new Double(offsetCommitSuccessRate).isNaN()) {
offsetCommitSuccessRate = 0;
}

if (new Double(offsetCommitFailureRate).isNaN()) {
offsetCommitFailureRate = 0;
}

return offsetCommitSuccessRate + offsetCommitFailureRate > 0 ? offsetCommitSuccessRate / (
offsetCommitSuccessRate + offsetCommitFailureRate) : 0;
};

metrics.addMetric(new MetricName("offset-commit-availability-avg", METRIC_GROUP_NAME,
Expand Down
17 changes: 11 additions & 6 deletions src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@

package com.linkedin.xinfra.monitor;

import com.linkedin.xinfra.monitor.common.Utils;
import com.linkedin.xinfra.monitor.services.ServiceFactory;
import com.linkedin.xinfra.monitor.services.Service;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.testng.Assert;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -71,16 +76,16 @@ public void run() {

t.start();
xinfraMonitor.start();
Thread.sleep(100);
Utils.delay(Duration.ofMillis(100));
xinfraMonitor.stop();
t.join(500);
org.testng.Assert.assertFalse(t.isAlive());
org.testng.Assert.assertEquals(error.get(), null);
Assert.assertFalse(t.isAlive());
Assert.assertNull(error.get());
}

private XinfraMonitor xinfraMonitor() throws Exception {
FakeService.clearCounters();
Map<String, Map> config = new HashMap<>();
Map<String, Map<String, Object>> config = new HashMap<>();
Map<String, Object> fakeServiceConfig = new HashMap<>();

fakeServiceConfig.put(XinfraMonitorConstants.CLASS_NAME_CONFIG, FakeService.class.getName());
Expand All @@ -106,7 +111,7 @@ public FakeServiceFactory(Map config, String serviceInstanceName) {

@SuppressWarnings("unchecked")
@Override
public Service createService() throws Exception {
public Service createService() {

return new XinfraMonitorTest.FakeService(_config, _serviceInstanceName);

Expand All @@ -120,7 +125,7 @@ static final class FakeService implements Service {
private final AtomicBoolean _isRunning = new AtomicBoolean();

/** required */
public FakeService(Map<String, Map> config, String serviceInstanceName) {
public FakeService(Map<String, Map<String, Object>> config, String serviceInstanceName) {

}

Expand Down
Loading