From 59de3e9a6ffc20684ed482e579db04a6c07cbb5e Mon Sep 17 00:00:00 2001 From: yechun Date: Sat, 6 May 2023 12:15:08 +0800 Subject: [PATCH] [ISSUE #494] support prometheus --- metric-exporter/pom.xml | 8 + .../connect/metrics/DropwizardExports.java | 234 ++++++++++++++++++ .../metrics/PrometheusSampleBuilder.java | 59 +++++ pom.xml | 12 + rocketmq-connect-runtime/pom.xml | 5 +- .../connect/runtime/config/WorkerConfig.java | 11 + .../runtime/connectorwrapper/Worker.java | 4 + .../rest/PrometheusMetricsServlet.java | 127 ++++++++++ .../connect/runtime/rest/RestHandler.java | 20 ++ 9 files changed, 479 insertions(+), 1 deletion(-) create mode 100644 metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/DropwizardExports.java create mode 100644 metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/PrometheusSampleBuilder.java create mode 100644 rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/PrometheusMetricsServlet.java diff --git a/metric-exporter/pom.xml b/metric-exporter/pom.xml index 658b28b54..988ffd813 100644 --- a/metric-exporter/pom.xml +++ b/metric-exporter/pom.xml @@ -41,6 +41,14 @@ com.google.guava guava + + io.prometheus + simpleclient_dropwizard + + + io.prometheus + simpleclient + diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/DropwizardExports.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/DropwizardExports.java new file mode 100644 index 000000000..bbbc0ee48 --- /dev/null +++ b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/DropwizardExports.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import io.prometheus.client.dropwizard.samplebuilder.DefaultSampleBuilder; +import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.rocketmq.connect.metrics.stats.Stat; + +/** + * Collect Dropwizard metrics from a MetricRegistry. + */ +public class DropwizardExports extends io.prometheus.client.Collector implements io.prometheus.client.Collector.Describable { + private static final Logger LOGGER = Logger.getLogger(DropwizardExports.class.getName()); + private MetricRegistry registry; + private MetricFilter metricFilter; + private SampleBuilder sampleBuilder; + + /** + * Creates a new DropwizardExports with a {@link DefaultSampleBuilder} and {@link MetricFilter#ALL}. + * + * @param registry a metric registry to export in prometheus. + */ + public DropwizardExports(MetricRegistry registry) { + this.registry = registry; + this.metricFilter = MetricFilter.ALL; + this.sampleBuilder = new DefaultSampleBuilder(); + } + + /** + * Creates a new DropwizardExports with a {@link DefaultSampleBuilder} and custom {@link MetricFilter}. + * + * @param registry a metric registry to export in prometheus. + * @param metricFilter a custom metric filter. + */ + public DropwizardExports(MetricRegistry registry, MetricFilter metricFilter) { + this.registry = registry; + this.metricFilter = metricFilter; + this.sampleBuilder = new DefaultSampleBuilder(); + } + + /** + * @param registry a metric registry to export in prometheus. + * @param sampleBuilder sampleBuilder to use to create prometheus samples. + */ + public DropwizardExports(MetricRegistry registry, SampleBuilder sampleBuilder) { + this.registry = registry; + this.metricFilter = MetricFilter.ALL; + this.sampleBuilder = sampleBuilder; + } + + /** + * @param registry a metric registry to export in prometheus. + * @param metricFilter a custom metric filter. + * @param sampleBuilder sampleBuilder to use to create prometheus samples. + */ + public DropwizardExports(MetricRegistry registry, MetricFilter metricFilter, SampleBuilder sampleBuilder) { + this.registry = registry; + this.metricFilter = metricFilter; + this.sampleBuilder = sampleBuilder; + } + + private static String getHelpMessage(String metricName, Metric metric) { + return String.format("Generated from Dropwizard metric import (metric=%s, type=%s)", metricName, metric.getClass().getName()); + } + + /** + * Export counter as Prometheus Gauge. + */ + MetricFamilySamples fromCounter(String dropwizardName, Counter counter) { + MetricFamilySamples.Sample sample = sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), new Long(counter.getCount()).doubleValue()); + return new MetricFamilySamples(sample.name, Type.GAUGE, getHelpMessage(dropwizardName, counter), Arrays.asList(sample)); + } + + /** + * Export gauge as a prometheus gauge. + */ + MetricFamilySamples fromGauge(String dropwizardName, Gauge gauge) { + Object obj = gauge.getValue(); + double value; + if (obj instanceof Number) { + value = ((Number) obj).doubleValue(); + } else if (obj instanceof Boolean) { + value = ((Boolean) obj) ? 1 : 0; + } else { + LOGGER.log(Level.FINE, String.format("Invalid type for Gauge %s: %s", sanitizeMetricName(dropwizardName), obj == null ? "null" : obj.getClass().getName())); + return null; + } + MetricFamilySamples.Sample sample = sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), value); + return new MetricFamilySamples(sample.name, Type.GAUGE, getHelpMessage(dropwizardName, gauge), Arrays.asList(sample)); + } + + /** + * Export a histogram snapshot as a prometheus SUMMARY. + * + * @param dropwizardName metric name. + * @param snapshot the histogram snapshot. + * @param count the total sample count for this snapshot. + * @param factor a factor to apply to histogram values. + */ + MetricFamilySamples fromSnapshotAndCount(String dropwizardName, Snapshot snapshot, long count, double factor, + String helpMessage) { + MetricName metricName = MetricUtils.stringToMetricName(dropwizardName); + Stat.HistogramType histogramType = Stat.HistogramType.valueOf(metricName.getType()); + List samples = new ArrayList<>(); + switch (histogramType) { + case Avg: + samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), snapshot.getMean() * factor)); + break; + case Min: + samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), snapshot.getMin() * factor)); + break; + case Max: + samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), snapshot.getMax() * factor)); + break; + case Percentile_75th: + samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.75"), snapshot.get75thPercentile() * factor)); + break; + case Percentile_95th: + samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.95"), snapshot.get95thPercentile() * factor)); + break; + case Percentile_98th: + samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.98"), snapshot.get98thPercentile() * factor)); + break; + case Percentile_99th: + samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.99"), snapshot.get99thPercentile() * factor)); + break; + case Percentile_999th: + samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.999"), snapshot.get999thPercentile() * factor)); + break; + default: + samples = Arrays.asList(sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.5"), snapshot.getMedian() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.5"), snapshot.getMedian() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.75"), snapshot.get75thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.95"), snapshot.get95thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.98"), snapshot.get98thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.99"), snapshot.get99thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "", Arrays.asList("quantile"), Arrays.asList("0.999"), snapshot.get999thPercentile() * factor), sampleBuilder.createSample(dropwizardName, "_count", new ArrayList(), new ArrayList(), count)); + + } + return new MetricFamilySamples(samples.get(0).name, Type.SUMMARY, helpMessage, samples); + } + + /** + * Convert histogram snapshot. + */ + MetricFamilySamples fromHistogram(String dropwizardName, Histogram histogram) { + return fromSnapshotAndCount(dropwizardName, histogram.getSnapshot(), histogram.getCount(), 1.0, getHelpMessage(dropwizardName, histogram)); + } + + /** + * Export Dropwizard Timer as a histogram. Use TIME_UNIT as time unit. + */ + MetricFamilySamples fromTimer(String dropwizardName, Timer timer) { + return fromSnapshotAndCount(dropwizardName, timer.getSnapshot(), timer.getCount(), 1.0D / TimeUnit.SECONDS.toNanos(1L), getHelpMessage(dropwizardName, timer)); + } + + /** + * Export a Meter as as prometheus COUNTER. + */ + MetricFamilySamples fromMeter(String dropwizardName, Meter meter) { + MetricName metricName = MetricUtils.stringToMetricName(dropwizardName); + final MetricFamilySamples.Sample sample = sampleBuilder.createSample(dropwizardName, "", new ArrayList(), new ArrayList(), MetricUtils.getMeterValue(metricName, meter)); + return new MetricFamilySamples(sample.name, Type.COUNTER, getHelpMessage(dropwizardName, meter), Arrays.asList(sample)); + } + + @Override + public List collect() { + Map mfSamplesMap = new HashMap(); + + for (SortedMap.Entry entry : registry.getGauges(metricFilter).entrySet()) { + addToMap(mfSamplesMap, fromGauge(entry.getKey(), entry.getValue())); + } + for (SortedMap.Entry entry : registry.getCounters(metricFilter).entrySet()) { + addToMap(mfSamplesMap, fromCounter(entry.getKey(), entry.getValue())); + } + for (SortedMap.Entry entry : registry.getHistograms(metricFilter).entrySet()) { + addToMap(mfSamplesMap, fromHistogram(entry.getKey(), entry.getValue())); + } + for (SortedMap.Entry entry : registry.getTimers(metricFilter).entrySet()) { + addToMap(mfSamplesMap, fromTimer(entry.getKey(), entry.getValue())); + } + for (SortedMap.Entry entry : registry.getMeters(metricFilter).entrySet()) { + addToMap(mfSamplesMap, fromMeter(entry.getKey(), entry.getValue())); + } + return new ArrayList(mfSamplesMap.values()); + } + + private void addToMap(Map mfSamplesMap, MetricFamilySamples newMfSamples) { + if (newMfSamples != null) { + MetricFamilySamples currentMfSamples = mfSamplesMap.get(newMfSamples.name); + if (currentMfSamples == null) { + mfSamplesMap.put(newMfSamples.name, newMfSamples); + } else { + Set samples = new HashSet(currentMfSamples.samples); + samples.addAll(newMfSamples.samples); + List list = new ArrayList<>(samples); + mfSamplesMap.put(newMfSamples.name, new MetricFamilySamples(newMfSamples.name, currentMfSamples.type, currentMfSamples.help, list)); + } + } + } + + @Override + public List describe() { + return new ArrayList(); + } +} \ No newline at end of file diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/PrometheusSampleBuilder.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/PrometheusSampleBuilder.java new file mode 100644 index 000000000..54f17b391 --- /dev/null +++ b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/PrometheusSampleBuilder.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.metrics; + +import io.prometheus.client.Collector; +import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.lang3.StringUtils; + +public class PrometheusSampleBuilder implements SampleBuilder { + private static final List SOURCE_TASK_LABEL_NAMES = Arrays.asList("metric_group", "data_type", "connector", "task"); + + @Override + public Collector.MetricFamilySamples.Sample createSample(String dropwizardName, String nameSuffix, + List additionalLabelNames, List additionalLabelValues, double value) { + String suffix = nameSuffix == null ? "" : nameSuffix; + List labelValues = sanitizeLabelValues(dropwizardName); + return new Collector.MetricFamilySamples.Sample(sanitizeMetricName(dropwizardName + suffix), SOURCE_TASK_LABEL_NAMES, labelValues, value); + } + + public String sanitizeMetricName(String dropwizardName) { + return dropwizardName.split(":")[1].split(",")[1].replaceAll("-", "_"); + } + + public List sanitizeLabelValues(String dropwizardName) { + String[] var = dropwizardName.split(":"); + String[] split = var[1].split(","); + + String metricGroup = split[0]; + String metricName = split[1].replaceAll("-", "_"); + String dateType = split[2]; + String var3 = split[3]; + String connectorName = StringUtils.EMPTY; + if (!StringUtils.equals(var3, "")) { + connectorName = var3.substring(var3.indexOf("=") + 1); + } + String var4 = split[4]; + String taskid = StringUtils.EMPTY; + if (!StringUtils.equals(var4, "")) { + taskid = var4.substring(var4.indexOf("=") + 1); + } + return Arrays.asList(metricGroup, dateType, connectorName, taskid); + } +} diff --git a/pom.xml b/pom.xml index 298f5d2a8..e49f6dc3f 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,8 @@ 1.8 1.8 3.12.0 + 0.6.0 + @@ -202,6 +204,16 @@ commons-lang3 ${commons-lang3.version} + + io.prometheus + simpleclient + ${prometheus.version} + + + io.prometheus + simpleclient_dropwizard + ${prometheus.version} + diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml index 95f43c3a7..1413646e7 100644 --- a/rocketmq-connect-runtime/pom.xml +++ b/rocketmq-connect-runtime/pom.xml @@ -264,7 +264,10 @@ metrics-core 4.2.0 - + + io.prometheus + simpleclient + org.apache.rocketmq metric-exporter diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java index dd5866073..fd0d1d17c 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java @@ -58,6 +58,8 @@ public class WorkerConfig { */ private int httpPort = 8082; + private int exporterPort = 5557; + /** * plugin paths config; * Multiple use ',' split @@ -591,6 +593,14 @@ public void setStateManagementService(String stateManagementService) { this.stateManagementService = stateManagementService; } + public int getExporterPort() { + return exporterPort; + } + + public void setExporterPort(int exporterPort) { + this.exporterPort = exporterPort; + } + @Override public String toString() { return "WorkerConfig{" + @@ -638,6 +648,7 @@ public String toString() { ", configManagementService='" + configManagementService + '\'' + ", positionManagementService='" + positionManagementService + '\'' + ", stateManagementService='" + stateManagementService + '\'' + + ", exporterPort=" + exporterPort + '}'; } } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java index 496e02767..de6e5fe75 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java @@ -27,6 +27,7 @@ import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.data.RecordConverter; import io.openmessaging.connector.api.errors.ConnectException; +import io.prometheus.client.CollectorRegistry; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -50,6 +51,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.connect.metrics.DropwizardExports; +import org.apache.rocketmq.connect.metrics.PrometheusSampleBuilder; import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; import org.apache.rocketmq.connect.runtime.common.LoggerName; import org.apache.rocketmq.connect.runtime.config.ConnectorConfig; @@ -148,6 +151,7 @@ public Worker(WorkerConfig workerConfig, this.executor = Executors.newCachedThreadPool(); this.connectMetrics = new ConnectMetrics(workerConfig); this.stateManagementService = stateManagementService; + CollectorRegistry.defaultRegistry.register(new DropwizardExports(connectMetrics.registry(), new PrometheusSampleBuilder())); } public void start() { diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/PrometheusMetricsServlet.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/PrometheusMetricsServlet.java new file mode 100644 index 000000000..0cf915b8a --- /dev/null +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/PrometheusMetricsServlet.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.runtime.rest; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class PrometheusMetricsServlet extends HttpServlet { + private CollectorRegistry registry; + + public PrometheusMetricsServlet() { + this(CollectorRegistry.defaultRegistry); + } + + public PrometheusMetricsServlet(CollectorRegistry registry) { + this.registry = registry; + } + + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + resp.setStatus(200); + resp.setContentType("text/plain; version=0.0.4; charset=utf-8"); + StringWriter writer = new StringWriter(); + + this.writeEscapedHelp(writer, registry); + resp.getOutputStream().print(writer.toString()); + } + + public void writeEscapedHelp(StringWriter writer, CollectorRegistry registry) throws IOException { + Enumeration metricFamilySamplesEnumeration = registry.metricFamilySamples(); + List list = new ArrayList<>(); + while (metricFamilySamplesEnumeration.hasMoreElements()) { + Collector.MetricFamilySamples metricFamilySamples = metricFamilySamplesEnumeration.nextElement(); + list.add(metricFamilySamples); + } + writeEscapedHelp(writer, list); + } + + public void writeEscapedHelp(StringWriter writer, List mfs) throws IOException { + if (Objects.nonNull(mfs) && mfs.size() != 0) { + for (Collector.MetricFamilySamples metricFamilySamples : mfs) { + for (Iterator var3 = metricFamilySamples.samples.iterator(); var3.hasNext(); writer.write(10)) { + Collector.MetricFamilySamples.Sample sample = (Collector.MetricFamilySamples.Sample) var3.next(); + writer.write(sample.name); + if (sample.labelNames.size() > 0) { + writer.write(123); + + for (int i = 0; i < sample.labelNames.size(); ++i) { + writer.write((String) sample.labelNames.get(i)); + writer.write("=\""); + writeEscapedLabelValue(writer, (String) sample.labelValues.get(i)); + writer.write("\","); + } + + writer.write(125); + } + + writer.write(32); + writer.write(Collector.doubleToGoString(sample.value)); + if (sample.timestampMs != null) { + writer.write(32); + writer.write(sample.timestampMs.toString()); + } + } + } + } + + } + + private static void writeEscapedLabelValue(Writer writer, String s) throws IOException { + for (int i = 0; i < s.length(); ++i) { + char c = s.charAt(i); + switch (c) { + case '\n': + writer.append("\\n"); + break; + case '"': + writer.append("\\\""); + break; + case '\\': + writer.append("\\\\"); + break; + default: + writer.append(c); + } + } + + } + + private Set parse(HttpServletRequest req) { + String[] includedParam = req.getParameterValues("name[]"); + return (Set) (includedParam == null ? Collections.emptySet() : new HashSet(Arrays.asList(includedParam))); + } + + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + this.doGet(req, resp); + } +} diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java index b4f6c378f..733379b54 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java @@ -31,6 +31,11 @@ import org.apache.rocketmq.connect.runtime.rest.entities.HttpResponse; import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +65,21 @@ public RestHandler(AbstractConnectController connectController) { this.connectController = connectController; pluginsResource = new ConnectorPluginsResource(connectController); + Javalin embeddedApp = Javalin.create(config -> { + config.server(() -> { + Server server = new Server(connectController.getConnectConfig().getExporterPort()); + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + context.addServlet(new ServletHolder(new PrometheusMetricsServlet()), "/metrics"); + ContextHandlerCollection handlers = new ContextHandlerCollection(); + handlers.setHandlers(new Handler[]{context}); + server.setHandler(handlers); + return server; + }); + }); + + embeddedApp.start(connectController.getConnectConfig().getExporterPort()); + Javalin app = Javalin.create(); app = app.start(connectController.getConnectConfig().getHttpPort());