diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopic.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopic.java index d04eecd..0380eb9 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopic.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopic.java @@ -19,9 +19,7 @@ public static String formatOffsetsHeader(Iterable headers = new HashMap(); + Map headers = new HashMap(2); + boolean built = false; public UpdatesBodyPerTopicJSON(String topicName) { builder = Json.createObjectBuilder(); @@ -38,12 +42,21 @@ public UpdatesBodyPerTopicJSON(String topicName) { } JsonObject getCurrent() { - return json.add(OFFSETS_KEY, offsets).add(UPDATES_KEY, updates).build(); + if (built) { + throw new IllegalStateException("Refusing to serialize content twice"); + } + if (offsetsBuilt == null) getHeaders(); + built = true; + return json.add(OFFSETS_KEY, offsetsBuilt).add(UPDATES_KEY, updates).build(); } @Override public Map getHeaders() { - headers.put(UpdatesBodyPerTopic.HEADER_OFFSETS, offsets.build().toString()); + if (built) { + throw new IllegalStateException("Refusing to return headers after content"); + } + offsetsBuilt = offsets.build(); + headers.put(UpdatesBodyPerTopic.HEADER_OFFSETS, offsetsBuilt.toString()); return headers; } @@ -59,18 +72,16 @@ public void handle(UpdateRecord update) { } @Override - public String getContent() { - return getCurrent().toString(); - } - - @Override - public int getContentLength() { - throw new UnsupportedOperationException("not implemented"); + public byte[] getContent() { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + getContent(out); + return out.toByteArray(); } @Override public void getContent(OutputStream out) { - throw new UnsupportedOperationException("not implemented"); + JsonWriter writer = Json.createWriter(out); + writer.write(getCurrent()); } } diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java index a4dca83..57ea09d 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java @@ -7,6 +7,7 @@ import org.apache.http.HttpHost; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; @@ -54,12 +55,8 @@ public void dispatch(String topicName, UpdatesBodyPerTopic body) throws TargetAc } private HttpEntity getEntity(UpdatesBodyPerTopic body) { - StringEntity entity; - try { - entity = new StringEntity(body.getContent()); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } + ByteArrayEntity entity; + entity = new ByteArrayEntity(body.getContent()); entity.setContentType(body.getContentType()); return entity; } diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java index d58c341..594bc4b 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.*; +import java.io.ByteArrayOutputStream; import java.io.UnsupportedEncodingException; import java.util.Map; @@ -14,20 +15,27 @@ class UpdatesBodyPerTopicJSONTest { @Test void testEmpty() throws UnsupportedEncodingException { UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t"); - assertEquals("{\"v\":1,\"topic\":\"t\",\"offsets\":{},\"updates\":{}}", body.getContent()); Map headers = body.getHeaders(); + assertEquals( + "{\"v\":1,\"topic\":\"t\",\"offsets\":{},\"updates\":{}}", + new String(body.getContent())); + assertEquals("application/json", body.getContentType()); assertEquals("t", headers.get(UpdatesBodyPerTopic.HEADER_TOPIC)); - assertEquals("", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS)); + assertEquals("{}", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS)); } @Test void test1() throws UnsupportedEncodingException { UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t1"); body.handle(new UpdateRecord("t", 1, 3, "k1")); - assertEquals("{\"v\":1,\"topic\":\"t1\",\"offsets\":{\"1\":3},\"updates\":{\"k1\":{}}}", body.getContent()); Map headers = body.getHeaders(); + ByteArrayOutputStream content = new ByteArrayOutputStream(); + body.getContent(content); + assertEquals( + "{\"v\":1,\"topic\":\"t1\",\"offsets\":{\"1\":3},\"updates\":{\"k1\":{}}}", + new String(content.toByteArray())); assertEquals("t1", headers.get(UpdatesBodyPerTopic.HEADER_TOPIC)); - assertEquals("1=3", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS)); + assertEquals("{\"1\":3}", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS)); } @Test @@ -35,7 +43,53 @@ void test2() throws UnsupportedEncodingException { UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2"); body.handle(new UpdateRecord("t", 0, 10, "k1")); body.handle(new UpdateRecord("t", 0, 11, "k2")); - assertEquals("{\"v\":1,\"topic\":\"t2\",\"offsets\":{\"0\":11},\"updates\":{\"k1\":{},\"k2\":{}}}", body.getContent()); + assertEquals( + "{\"v\":1,\"topic\":\"t2\",\"offsets\":{\"0\":11},\"updates\":{\"k1\":{},\"k2\":{}}}", + new String(body.getContent())); + } + + @Test + void testContentTwice() { + UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2"); + body.getContent(); + try { + body.getContent(); + } catch (IllegalStateException e) { + assertEquals("Refusing to serialize content twice", e.getMessage()); + } + } + + @Test + void testContentTwiceStream() { + UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2"); + body.getContent(); + try { + body.getContent(new ByteArrayOutputStream()); + } catch (IllegalStateException e) { + assertEquals("Refusing to serialize content twice", e.getMessage()); + } + } + + @Test + void testContentTwiceStreamFirst() { + UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2"); + body.getContent(new ByteArrayOutputStream()); + try { + body.getContent(); + } catch (IllegalStateException e) { + assertEquals("Refusing to serialize content twice", e.getMessage()); + } + } + + @Test + void testHeadersAfterContent() { + UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2"); + body.getContent(new ByteArrayOutputStream()); + try { + body.getHeaders(); + } catch (IllegalStateException e) { + assertEquals("Refusing to return headers after content", e.getMessage()); + } } } diff --git a/test-k/bash.yaml b/test-k/bash.yaml index ce8b63a..e6be297 100644 --- a/test-k/bash.yaml +++ b/test-k/bash.yaml @@ -75,22 +75,17 @@ spec: config: path: /logs/envoy-access.log json_format: - ind: "enforcer access" start_time: "%START_TIME%" req_method: "%REQ(:METHOD)%" req_path: "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%" - protocol: "%PROTOCOL%" resp_code: "%RESPONSE_CODE%" resp_flags: "%RESPONSE_FLAGS%" bytes_recv: "%BYTES_RECEIVED%" bytes_sent: "%BYTES_SENT%" duration: "%DURATION%" - resp_time: "%RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)%" - req_x_forward: "%REQ(X-FORWARDED-FOR)%" agent: "%REQ(USER-AGENT)%" - req_xid: "%REQ(X-REQUEST-ID)%" - auth: "%REQ(:AUTHORITY)%" - upstream_host: "%UPSTREAM_HOST%" + content_type: "%REQ(CONTENT-TYPE)%" + content_length: "%REQ(CONTENT-LENGTH)%" kkv_topic: "%REQ(X-KKV-TOPIC)%" kkv_offsets: "%REQ(X-KKV-OFFSETS)%" route_config: