Skip to content

Commit

Permalink
[HUDI-7763] Fix that multiple jmx reporter can exist if metadata enab…
Browse files Browse the repository at this point in the history
…les (#11226)
  • Loading branch information
hwani3142 authored Jun 27, 2024
1 parent c80b559 commit ea6a614
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -33,6 +34,8 @@
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -78,4 +81,55 @@ public void testRegisterGaugeByRangerPort() {
assertEquals("123", metrics.getRegistry().getGauges()
.get("jmx_metric2").getValue().toString());
}

@Test
public void testMultipleJmxReporterServer() {
String ports = "9889-9890";
clearInvocations(metricsConfig);
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
when(metricsConfig.getJmxHost()).thenReturn("localhost");
when(metricsConfig.getJmxPort()).thenReturn(ports);
when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
hoodieMetrics = new HoodieMetrics(writeConfig, HoodieTestUtils.getDefaultStorage());
metrics = hoodieMetrics.getMetrics();

clearInvocations(metricsConfig);
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
when(metricsConfig.getJmxHost()).thenReturn("localhost");
when(metricsConfig.getJmxPort()).thenReturn(ports);
when(metricsConfig.getBasePath()).thenReturn("s3://test2" + UUID.randomUUID());

hoodieMetrics = new HoodieMetrics(writeConfig, HoodieTestUtils.getDefaultStorage());
Metrics metrics2 = hoodieMetrics.getMetrics();

metrics.registerGauge("jmx_metric3", 123L);
assertEquals("123", metrics.getRegistry().getGauges()
.get("jmx_metric3").getValue().toString());

metrics2.registerGauge("jmx_metric4", 123L);
assertEquals("123", metrics2.getRegistry().getGauges()
.get("jmx_metric4").getValue().toString());
}

@Test
public void testMultipleJmxReporterServerFailedForOnePort() {
String ports = "9891";
clearInvocations(metricsConfig);
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
when(metricsConfig.getJmxHost()).thenReturn("localhost");
when(metricsConfig.getJmxPort()).thenReturn(ports);
when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
hoodieMetrics = new HoodieMetrics(writeConfig, HoodieTestUtils.getDefaultStorage());
metrics = hoodieMetrics.getMetrics();

clearInvocations(metricsConfig);
when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
when(metricsConfig.getJmxHost()).thenReturn("localhost");
when(metricsConfig.getJmxPort()).thenReturn(ports);
when(metricsConfig.getBasePath()).thenReturn("s3://test2" + UUID.randomUUID());

assertThrows(HoodieException.class, () -> {
hoodieMetrics = new HoodieMetrics(writeConfig, HoodieTestUtils.getDefaultStorage());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.management.MBeanServer;

import java.lang.management.ManagementFactory;
import java.rmi.server.ExportException;
import java.util.Objects;
import java.util.stream.IntStream;

Expand All @@ -53,16 +54,13 @@ public JmxMetricsReporter(HoodieMetricsConfig config, MetricRegistry registry) {
host, portsConfig));
}
int[] ports = getPortRangeFromString(portsConfig);
boolean successfullyStartedServer = false;
for (int port : ports) {
jmxReporterServer = createJmxReport(host, port);
LOG.info("Started JMX server on port " + port + ".");
successfullyStartedServer = true;
break;
}
initializeJmxReporterServer(host, ports);

boolean successfullyStartedServer = isServerCreated();
if (!successfullyStartedServer) {
throw new HoodieException(
"Could not start JMX server on any configured port. Ports: " + portsConfig);
"Could not start JMX server on any configured port. Ports: " + portsConfig
+ ". Maybe require port range for multiple hoodie tables");
}
LOG.info("Configured JMXReporter with {port:" + portsConfig + "}");
} catch (Exception e) {
Expand All @@ -72,9 +70,29 @@ public JmxMetricsReporter(HoodieMetricsConfig config, MetricRegistry registry) {
}
}

private boolean isServerCreated() {
return jmxReporterServer != null;
}

private void initializeJmxReporterServer(String host, int[] ports) {
for (int port : ports) {
try {
jmxReporterServer = createJmxReport(host, port);
LOG.info("Started JMX server on port " + port + ".");
break;
} catch (Exception e) {
if (e.getCause() instanceof ExportException) {
LOG.info("Skip for initializing jmx port " + port + " because of already in use");
} else {
LOG.info("Failed to initialize jmx port " + port + ". " + e.getMessage());
}
}
}
}

@Override
public void start() {
if (jmxReporterServer != null) {
if (isServerCreated()) {
jmxReporterServer.start();
} else {
LOG.error("Cannot start as the jmxReporter is null.");
Expand Down

0 comments on commit ea6a614

Please sign in to comment.