Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

URL supports placeholders and can be dynamically replaced with msg ex… #535

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class HttpSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(HttpSinkTask.class);
// the Regex Pattern like '{number}'
protected static final Pattern PATTERN = Pattern.compile("\\{(\\w+)\\}");
protected static final int DEFAULT_CONSUMER_TIMEOUT_SECONDS = 30;
protected static final String DEFAULT_REQUEST_TIMEOUT_MILL_SECONDS = "3000";
protected static final int DEFAULT_OAUTH_DELAY_SECONDS = 1;
Expand Down Expand Up @@ -105,8 +109,10 @@ public void put(List<ConnectRecord> records) throws ConnectException {
if (auth != null) {
headerMap.putAll(auth.auth());
}
// replace the placeholder of url with extensions
String urlInsteadOfPlaceholder = formatUrl(url, connectRecord.getExtensions());
// render query to url
String urlWithQueryParameters = renderQueryParametersToUrl(url, queryParameters, fixedQueryParameters);
String urlWithQueryParameters = renderQueryParametersToUrl(urlInsteadOfPlaceholder, queryParameters, fixedQueryParameters);
HttpRequest httpRequest = new HttpRequest();
httpRequest.setUrl(urlWithQueryParameters);
httpRequest.setMethod(method);
Expand Down Expand Up @@ -140,6 +146,32 @@ public void put(List<ConnectRecord> records) throws ConnectException {
}
}

/**
* Get a formatted url that will replace a placeholder with Extension Values
*
* @param url the source url str
* @param extensions ConnectRecord Extension Values
* @return the formatted url
*/
private String formatUrl(String url, KeyValue extensions) {
if (!PATTERN.matcher(url).matches()) {
return url;
}
if (extensions != null && extensions.keySet() != null) {
Set<String> keys = extensions.keySet();
String template = url;
for (String key : keys) {
String value = extensions.getString(key);
if (StringUtils.isNotEmpty(value)) {
// simple replaced the placeholder
template = template.replace("{" + key + "}", value);
}
}
return template;
}
return url;
}

private Map<String, String> renderHeaderMap(String headerParameters, String fixedHeaderParameters, String token) {
Map<String, String> headerMap = new HashMap<>();
if (headerParameters != null) {
Expand Down