From 98ea58afb3a2c183ccfbffe4ac62c14ff431e134 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Sun, 26 May 2019 18:04:02 +0200 Subject: [PATCH] Failed to pass host this way --- .../kafka/keyvalue/onupdate/OnUpdateForwarder.java | 6 ++++-- .../keyvalue/onupdate/hc/UpdatesDispatcherHttp.java | 9 +++++++-- .../keyvalue/onupdate/OnUpdateForwarderTest.java | 12 +++++++++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateForwarder.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateForwarder.java index 257f87a..4372303 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateForwarder.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateForwarder.java @@ -111,8 +111,10 @@ void updateDispatchersFromConfig() { logger.info("The list of {} update targets is ready", dispatchers); } - private List getTargetsConfig() { - return java.util.Arrays.asList(targetsConfig.orElse("").split(TARGETS_CONFIG_SEPARATOR_REGEX)); + List getTargetsConfig() { + String conf = targetsConfig.orElse(null); + if (conf == null) return Collections.emptyList(); + return java.util.Arrays.asList(conf.split(TARGETS_CONFIG_SEPARATOR_REGEX)); } void stopDispatcher(UpdatesDispatcher dispatcher) { diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java index 32b090c..18f70ba 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java @@ -7,6 +7,7 @@ import org.apache.http.HttpHost; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; @@ -25,24 +26,28 @@ public class UpdatesDispatcherHttp implements UpdatesDispatcher { ResponseHandlerAck responseHandler = new ResponseHandlerAck(); UpdateTarget target; CloseableHttpClient client; + HttpClientContext context; public UpdatesDispatcherHttp(String configuredTarget) { target = new UpdateTarget(configuredTarget); HttpHost host = target.getHttpclientContextHost(); // If we want to manage contexts logger.info("Creating http client for host {} target {}", host, target); + context = HttpClientContext.create(); + context.setTargetHost(host); + BasicHttpClientConnectionManager connectionManager = new BasicHttpClientConnectionManager(); client = HttpClients.createMinimal(connectionManager); } @Override public void dispatch(String topicName, UpdatesBodyPerTopic body) throws TargetAckFailedException { - HttpPost post = new HttpPost("http://localhost/"); + HttpPost post = new HttpPost(target.getHttpUriFromHost(topicName)); post.addHeader(UpdatesBodyPerTopic.HEADER_TOPIC, topicName); // TODO body should declare all headers instead post.setEntity(getEntity(body)); ResponseResult result; try { - result = client.execute(post, responseHandler); + result = client.execute(post, responseHandler, context); } catch (ClientProtocolException e) { throw new TargetAckFailedException(e); } catch (IOException e) { diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateForwarderTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateForwarderTest.java index 847880e..a262a05 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateForwarderTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/onupdate/OnUpdateForwarderTest.java @@ -2,13 +2,23 @@ import static org.junit.jupiter.api.Assertions.*; +import java.util.Optional; + import org.junit.jupiter.api.Test; class OnUpdateForwarderTest { @Test - void testGetDispatchers() { + void testGetTargetsConfig() { OnUpdateForwarder forwarder = new OnUpdateForwarder(); + + forwarder.targetsConfig = Optional.empty(); + assertNotNull(forwarder.getTargetsConfig()); + assertEquals(0, forwarder.getTargetsConfig().size()); + + forwarder.targetsConfig = Optional.of("http://example.net/"); + assertEquals(1, forwarder.getTargetsConfig().size()); + assertEquals("http://example.net/", forwarder.getTargetsConfig().get(0)); } }