diff --git a/src/main/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnce.java b/src/main/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnce.java index 83d0100..31693ee 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnce.java +++ b/src/main/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnce.java @@ -82,6 +82,8 @@ public enum Stage { .named("consume-loop") .up(); + Map currentOffsets = new HashMap<>(1); + public ConsumerAtLeastOnce() { runner = new Thread(this, "kafkaclient"); } @@ -223,6 +225,7 @@ void run(final KafkaConsumer consumer, final Map while (records.hasNext()) { ConsumerRecord record = records.next(); UpdateRecord update = new UpdateRecord(record.topic(), record.partition(), record.offset(), record.key()); + toStats(update); cache.put(record.key(), record.value()); Long start = nextUncommitted.get(update.getTopicPartition()); if (start == null) { @@ -249,9 +252,13 @@ void run(final KafkaConsumer consumer, final Map } + private void toStats(UpdateRecord update) { + currentOffsets.put(update.getTopicPartition(), update.getOffset()); + } + @Override public Long getCurrentOffset(String topicName, int partition) { - throw new UnsupportedOperationException("TODO implement"); + return currentOffsets.get(new TopicPartition(topicName, partition)); } @Override diff --git a/src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java b/src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java index 71bee6b..0ded218 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java +++ b/src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java @@ -61,7 +61,7 @@ public byte[] valueByKey(@PathParam("key") final String key, @Context UriInfo ur @GET @Path("/offset/{topic}/{partition}") - @Produces(MediaType.APPLICATION_JSON) + @Produces(MediaType.TEXT_PLAIN) public Long getCurrentOffset(@PathParam("topic") String topic, @PathParam("partition") Integer partition) { if (topic == null) { throw new BadRequestException("Topic can not be null"); diff --git a/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java b/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java index 57ea09d..a63c3f3 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java +++ b/src/main/java/se/yolean/kafka/keyvalue/onupdate/hc/UpdatesDispatcherHttp.java @@ -1,14 +1,12 @@ package se.yolean.kafka.keyvalue.onupdate.hc; import java.io.IOException; -import java.io.UnsupportedEncodingException; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.BasicHttpClientConnectionManager; diff --git a/test-k/bash.yaml b/test-k/bash.yaml index e6be297..ada193a 100644 --- a/test-k/bash.yaml +++ b/test-k/bash.yaml @@ -38,9 +38,9 @@ spec: - name: kafka_offset_reset value: earliest - name: update_targets - value: http://127.0.0.1:8080/204/__topic__ + value: http://127.0.0.1:8080/204/__TOPIC__ # We should also test with other endpoints, with Envoy's fault injection - # http://127.0.0.1:8080/204/__topic__,http://127.0.0.1:8080/?topic=__topic__ + # http://127.0.0.1:8080/204/__TOPIC__,http://127.0.0.1:8080/?topic=__TOPIC__ - name: envoy image: envoyproxy/envoy:v1.10.0@sha256:bf7970f469c3d2cd54a472536342bd50df0ddf099ebd51024b7f13016c4ee3c4 ports: @@ -167,7 +167,26 @@ spec: - /bin/bash - -cex - | - echo "Running" + echo "Running ad-hoc smoketests here" + curl -s --retry-connrefused --retry 5 http://127.0.0.1:8090/health + curl -s --retry 5 http://127.0.0.1:8090/ready + curl -s http://127.0.0.1:8090/cache/v1/keys + curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/0 + curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/1 + curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/2 + time=$(date -Is) + echo testtime=$time | kafkacat -b bootstrap.kafka:9092 -P -K '=' -t kkv-test-bash + curl -s http://127.0.0.1:8080/stats/prometheus | grep envoy_server_total_connections + echo "Waiting for evidence of an onupdate request ..." + tail -n 0 -f /logs/envoy-access.log | head -n 1 + curl -s http://127.0.0.1:8080/stats/prometheus | grep envoy_server_total_connections + curl -s http://127.0.0.1:8090/cache/v1/raw/testtime + curl -s http://127.0.0.1:8090/cache/v1/values + curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/0 + curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/1 + curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/2 + curl -s http://127.0.0.1:8080/stats/prometheus | grep envoy_server_total_connections + echo "Sleeping here, for exec to run manual testing" sleep infinity volumeMounts: - name: logs