diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSON.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSON.java index 319dbcc..e61ebb6 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSON.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSON.java @@ -1,6 +1,7 @@ package se.yolean.kafka.keyvalue.onupdate; import java.io.OutputStream; +import java.util.HashMap; import java.util.Map; import javax.json.Json; @@ -26,11 +27,14 @@ public class UpdatesBodyPerTopicJSON implements UpdatesBodyPerTopic { private JsonObjectBuilder updates; private JsonObjectBuilder json; + Map headers = new HashMap(); + public UpdatesBodyPerTopicJSON(String topicName) { builder = Json.createObjectBuilder(); offsets = Json.createObjectBuilder(); updates = Json.createObjectBuilder(); json = builder.add(VERSION_KEY, 1).add(TOPIC_KEY, topicName); + headers.put(UpdatesBodyPerTopic.HEADER_TOPIC, topicName); } JsonObject getCurrent() { @@ -39,8 +43,8 @@ JsonObject getCurrent() { @Override public Map getHeaders() { - // TODO implement, but maybe we want to rewrite json serialization first - return java.util.Collections.emptyMap(); + headers.put(UpdatesBodyPerTopic.HEADER_OFFSETS, offsets.build().toString()); + return headers; } @Override diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java index 8d4fdc5..a4dca83 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java @@ -38,7 +38,7 @@ public UpdatesDispatcherHttp(String configuredTarget) { @Override public void dispatch(String topicName, UpdatesBodyPerTopic body) throws TargetAckFailedException { HttpPost post = new HttpPost(target.getHttpUriFromHost(topicName)); - post.addHeader(UpdatesBodyPerTopic.HEADER_TOPIC, topicName); // TODO body should declare all headers instead + body.getHeaders().forEach((name, value) -> post.setHeader(name, value)); post.setEntity(getEntity(body)); ResponseResult result; try { diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java index 102faa1..d58c341 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.*; import java.io.UnsupportedEncodingException; +import java.util.Map; import org.junit.jupiter.api.Test; @@ -14,6 +15,9 @@ class UpdatesBodyPerTopicJSONTest { void testEmpty() throws UnsupportedEncodingException { UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t"); assertEquals("{\"v\":1,\"topic\":\"t\",\"offsets\":{},\"updates\":{}}", body.getContent()); + Map headers = body.getHeaders(); + assertEquals("t", headers.get(UpdatesBodyPerTopic.HEADER_TOPIC)); + assertEquals("", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS)); } @Test @@ -21,6 +25,9 @@ void test1() throws UnsupportedEncodingException { UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t1"); body.handle(new UpdateRecord("t", 1, 3, "k1")); assertEquals("{\"v\":1,\"topic\":\"t1\",\"offsets\":{\"1\":3},\"updates\":{\"k1\":{}}}", body.getContent()); + Map headers = body.getHeaders(); + assertEquals("t1", headers.get(UpdatesBodyPerTopic.HEADER_TOPIC)); + assertEquals("1=3", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS)); } @Test diff --git a/test-k/bash.yaml b/test-k/bash.yaml index 5ca3c1c..ce8b63a 100644 --- a/test-k/bash.yaml +++ b/test-k/bash.yaml @@ -91,6 +91,8 @@ spec: req_xid: "%REQ(X-REQUEST-ID)%" auth: "%REQ(:AUTHORITY)%" upstream_host: "%UPSTREAM_HOST%" + kkv_topic: "%REQ(X-KKV-TOPIC)%" + kkv_offsets: "%REQ(X-KKV-OFFSETS)%" route_config: name: boards_route virtual_hosts: