Skip to content
This repository has been archived by the owner on Oct 9, 2020. It is now read-only.

Commit

Permalink
Use byte array to avoid encoding; let httpclient figure out coontent …
Browse files Browse the repository at this point in the history
…length
  • Loading branch information
solsson committed May 26, 2019
1 parent d0ab39f commit 901db23
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ public static String formatOffsetsHeader(Iterable<java.util.Map.Entry<Integer, I

String getContentType();

int getContentLength();

String getContent();
byte[] getContent();

/**
* @param out UTF-8 stream
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package se.yolean.kafka.keyvalue.onupdate;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;

import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonWriter;

import se.yolean.kafka.keyvalue.UpdateRecord;

Expand All @@ -24,10 +26,12 @@ public class UpdatesBodyPerTopicJSON implements UpdatesBodyPerTopic {

private JsonObjectBuilder builder;
private JsonObjectBuilder offsets;
private JsonObject offsetsBuilt = null;
private JsonObjectBuilder updates;
private JsonObjectBuilder json;

Map<String,String> headers = new HashMap<String, String>();
Map<String,String> headers = new HashMap<String, String>(2);
boolean built = false;

public UpdatesBodyPerTopicJSON(String topicName) {
builder = Json.createObjectBuilder();
Expand All @@ -38,12 +42,21 @@ public UpdatesBodyPerTopicJSON(String topicName) {
}

JsonObject getCurrent() {
return json.add(OFFSETS_KEY, offsets).add(UPDATES_KEY, updates).build();
if (built) {
throw new IllegalStateException("Refusing to serialize content twice");
}
if (offsetsBuilt == null) getHeaders();
built = true;
return json.add(OFFSETS_KEY, offsetsBuilt).add(UPDATES_KEY, updates).build();
}

@Override
public Map<String, String> getHeaders() {
headers.put(UpdatesBodyPerTopic.HEADER_OFFSETS, offsets.build().toString());
if (built) {
throw new IllegalStateException("Refusing to return headers after content");
}
offsetsBuilt = offsets.build();
headers.put(UpdatesBodyPerTopic.HEADER_OFFSETS, offsetsBuilt.toString());
return headers;
}

Expand All @@ -59,18 +72,16 @@ public void handle(UpdateRecord update) {
}

@Override
public String getContent() {
return getCurrent().toString();
}

@Override
public int getContentLength() {
throw new UnsupportedOperationException("not implemented");
public byte[] getContent() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
getContent(out);
return out.toByteArray();
}

@Override
public void getContent(OutputStream out) {
throw new UnsupportedOperationException("not implemented");
JsonWriter writer = Json.createWriter(out);
writer.write(getCurrent());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.http.HttpHost;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
Expand Down Expand Up @@ -54,12 +55,8 @@ public void dispatch(String topicName, UpdatesBodyPerTopic body) throws TargetAc
}

private HttpEntity getEntity(UpdatesBodyPerTopic body) {
StringEntity entity;
try {
entity = new StringEntity(body.getContent());
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
ByteArrayEntity entity;
entity = new ByteArrayEntity(body.getContent());
entity.setContentType(body.getContentType());
return entity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.jupiter.api.Assertions.*;

import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Map;

Expand All @@ -14,28 +15,81 @@ class UpdatesBodyPerTopicJSONTest {
@Test
void testEmpty() throws UnsupportedEncodingException {
UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t");
assertEquals("{\"v\":1,\"topic\":\"t\",\"offsets\":{},\"updates\":{}}", body.getContent());
Map<String, String> headers = body.getHeaders();
assertEquals(
"{\"v\":1,\"topic\":\"t\",\"offsets\":{},\"updates\":{}}",
new String(body.getContent()));
assertEquals("application/json", body.getContentType());
assertEquals("t", headers.get(UpdatesBodyPerTopic.HEADER_TOPIC));
assertEquals("", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS));
assertEquals("{}", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS));
}

@Test
void test1() throws UnsupportedEncodingException {
UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t1");
body.handle(new UpdateRecord("t", 1, 3, "k1"));
assertEquals("{\"v\":1,\"topic\":\"t1\",\"offsets\":{\"1\":3},\"updates\":{\"k1\":{}}}", body.getContent());
Map<String, String> headers = body.getHeaders();
ByteArrayOutputStream content = new ByteArrayOutputStream();
body.getContent(content);
assertEquals(
"{\"v\":1,\"topic\":\"t1\",\"offsets\":{\"1\":3},\"updates\":{\"k1\":{}}}",
new String(content.toByteArray()));
assertEquals("t1", headers.get(UpdatesBodyPerTopic.HEADER_TOPIC));
assertEquals("1=3", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS));
assertEquals("{\"1\":3}", headers.get(UpdatesBodyPerTopic.HEADER_OFFSETS));
}

@Test
void test2() throws UnsupportedEncodingException {
UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2");
body.handle(new UpdateRecord("t", 0, 10, "k1"));
body.handle(new UpdateRecord("t", 0, 11, "k2"));
assertEquals("{\"v\":1,\"topic\":\"t2\",\"offsets\":{\"0\":11},\"updates\":{\"k1\":{},\"k2\":{}}}", body.getContent());
assertEquals(
"{\"v\":1,\"topic\":\"t2\",\"offsets\":{\"0\":11},\"updates\":{\"k1\":{},\"k2\":{}}}",
new String(body.getContent()));
}

@Test
void testContentTwice() {
UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2");
body.getContent();
try {
body.getContent();
} catch (IllegalStateException e) {
assertEquals("Refusing to serialize content twice", e.getMessage());
}
}

@Test
void testContentTwiceStream() {
UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2");
body.getContent();
try {
body.getContent(new ByteArrayOutputStream());
} catch (IllegalStateException e) {
assertEquals("Refusing to serialize content twice", e.getMessage());
}
}

@Test
void testContentTwiceStreamFirst() {
UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2");
body.getContent(new ByteArrayOutputStream());
try {
body.getContent();
} catch (IllegalStateException e) {
assertEquals("Refusing to serialize content twice", e.getMessage());
}
}

@Test
void testHeadersAfterContent() {
UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2");
body.getContent(new ByteArrayOutputStream());
try {
body.getHeaders();
} catch (IllegalStateException e) {
assertEquals("Refusing to return headers after content", e.getMessage());
}
}

}
9 changes: 2 additions & 7 deletions test-k/bash.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,17 @@ spec:
config:
path: /logs/envoy-access.log
json_format:
ind: "enforcer access"
start_time: "%START_TIME%"
req_method: "%REQ(:METHOD)%"
req_path: "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%"
protocol: "%PROTOCOL%"
resp_code: "%RESPONSE_CODE%"
resp_flags: "%RESPONSE_FLAGS%"
bytes_recv: "%BYTES_RECEIVED%"
bytes_sent: "%BYTES_SENT%"
duration: "%DURATION%"
resp_time: "%RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)%"
req_x_forward: "%REQ(X-FORWARDED-FOR)%"
agent: "%REQ(USER-AGENT)%"
req_xid: "%REQ(X-REQUEST-ID)%"
auth: "%REQ(:AUTHORITY)%"
upstream_host: "%UPSTREAM_HOST%"
content_type: "%REQ(CONTENT-TYPE)%"
content_length: "%REQ(CONTENT-LENGTH)%"
kkv_topic: "%REQ(X-KKV-TOPIC)%"
kkv_offsets: "%REQ(X-KKV-OFFSETS)%"
route_config:
Expand Down

0 comments on commit 901db23

Please sign in to comment.