From f2a468f7c691ee8d62e68953dce4d40f50cd06fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=80=E7=B1=B3=E9=98=B3=E5=85=89?= <776070848@qq.com> Date: Mon, 20 May 2024 18:07:31 +0800 Subject: [PATCH] URL supports placeholders and can be dynamically replaced with msg extensions --- .../rocketmq/connect/http/HttpSinkTask.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkTask.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkTask.java index d8a220c9..a6d0c96c 100644 --- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkTask.java +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/HttpSinkTask.java @@ -48,13 +48,17 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; public class HttpSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(HttpSinkTask.class); + // the Regex Pattern like '{number}' + protected static final Pattern PATTERN = Pattern.compile("\\{(\\w+)\\}"); protected static final int DEFAULT_CONSUMER_TIMEOUT_SECONDS = 30; protected static final String DEFAULT_REQUEST_TIMEOUT_MILL_SECONDS = "3000"; protected static final int DEFAULT_OAUTH_DELAY_SECONDS = 1; @@ -105,8 +109,10 @@ public void put(List records) throws ConnectException { if (auth != null) { headerMap.putAll(auth.auth()); } + // replace the placeholder of url with extensions + String urlInsteadOfPlaceholder = formatUrl(url, connectRecord.getExtensions()); // render query to url - String urlWithQueryParameters = renderQueryParametersToUrl(url, queryParameters, fixedQueryParameters); + String urlWithQueryParameters = renderQueryParametersToUrl(urlInsteadOfPlaceholder, queryParameters, fixedQueryParameters); HttpRequest httpRequest = new HttpRequest(); httpRequest.setUrl(urlWithQueryParameters); httpRequest.setMethod(method); @@ -140,6 +146,32 @@ public void put(List records) throws ConnectException { } } + /** + * Get a formatted url that will replace a placeholder with Extension Values + * + * @param url the source url str + * @param extensions ConnectRecord Extension Values + * @return the formatted url + */ + private String formatUrl(String url, KeyValue extensions) { + if (!PATTERN.matcher(url).matches()) { + return url; + } + if (extensions != null && extensions.keySet() != null) { + Set keys = extensions.keySet(); + String template = url; + for (String key : keys) { + String value = extensions.getString(key); + if (StringUtils.isNotEmpty(value)) { + // simple replaced the placeholder + template = template.replace("{" + key + "}", value); + } + } + return template; + } + return url; + } + private Map renderHeaderMap(String headerParameters, String fixedHeaderParameters, String token) { Map headerMap = new HashMap<>(); if (headerParameters != null) {