Skip to content
This repository has been archived by the owner on Oct 9, 2020. It is now read-only.

Commit

Permalink
Example usage. What's a good offsets endpoint?
Browse files Browse the repository at this point in the history
  • Loading branch information
solsson committed May 26, 2019
1 parent 901db23 commit 4ffd0b5
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public enum Stage {
.named("consume-loop")
.up();

Map<TopicPartition,Long> currentOffsets = new HashMap<>(1);

public ConsumerAtLeastOnce() {
runner = new Thread(this, "kafkaclient");
}
Expand Down Expand Up @@ -223,6 +225,7 @@ void run(final KafkaConsumer<String, byte[]> consumer, final Map<String, byte[]>
while (records.hasNext()) {
ConsumerRecord<String, byte[]> record = records.next();
UpdateRecord update = new UpdateRecord(record.topic(), record.partition(), record.offset(), record.key());
toStats(update);
cache.put(record.key(), record.value());
Long start = nextUncommitted.get(update.getTopicPartition());
if (start == null) {
Expand All @@ -249,9 +252,13 @@ void run(final KafkaConsumer<String, byte[]> consumer, final Map<String, byte[]>

}

private void toStats(UpdateRecord update) {
currentOffsets.put(update.getTopicPartition(), update.getOffset());
}

@Override
public Long getCurrentOffset(String topicName, int partition) {
throw new UnsupportedOperationException("TODO implement");
return currentOffsets.get(new TopicPartition(topicName, partition));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public byte[] valueByKey(@PathParam("key") final String key, @Context UriInfo ur

@GET
@Path("/offset/{topic}/{partition}")
@Produces(MediaType.APPLICATION_JSON)
@Produces(MediaType.TEXT_PLAIN)
public Long getCurrentOffset(@PathParam("topic") String topic, @PathParam("partition") Integer partition) {
if (topic == null) {
throw new BadRequestException("Topic can not be null");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package se.yolean.kafka.keyvalue.onupdate.hc;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
Expand Down
25 changes: 22 additions & 3 deletions test-k/bash.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ spec:
- name: kafka_offset_reset
value: earliest
- name: update_targets
value: http://127.0.0.1:8080/204/__topic__
value: http://127.0.0.1:8080/204/__TOPIC__
# We should also test with other endpoints, with Envoy's fault injection
# http://127.0.0.1:8080/204/__topic__,http://127.0.0.1:8080/?topic=__topic__
# http://127.0.0.1:8080/204/__TOPIC__,http://127.0.0.1:8080/?topic=__TOPIC__
- name: envoy
image: envoyproxy/envoy:v1.10.0@sha256:bf7970f469c3d2cd54a472536342bd50df0ddf099ebd51024b7f13016c4ee3c4
ports:
Expand Down Expand Up @@ -167,7 +167,26 @@ spec:
- /bin/bash
- -cex
- |
echo "Running"
echo "Running ad-hoc smoketests here"
curl -s --retry-connrefused --retry 5 http://127.0.0.1:8090/health
curl -s --retry 5 http://127.0.0.1:8090/ready
curl -s http://127.0.0.1:8090/cache/v1/keys
curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/0
curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/1
curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/2
time=$(date -Is)
echo testtime=$time | kafkacat -b bootstrap.kafka:9092 -P -K '=' -t kkv-test-bash
curl -s http://127.0.0.1:8080/stats/prometheus | grep envoy_server_total_connections
echo "Waiting for evidence of an onupdate request ..."
tail -n 0 -f /logs/envoy-access.log | head -n 1
curl -s http://127.0.0.1:8080/stats/prometheus | grep envoy_server_total_connections
curl -s http://127.0.0.1:8090/cache/v1/raw/testtime
curl -s http://127.0.0.1:8090/cache/v1/values
curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/0
curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/1
curl -s http://127.0.0.1:8090/cache/v1/offset/kkv-test-bash/2
curl -s http://127.0.0.1:8080/stats/prometheus | grep envoy_server_total_connections
echo "Sleeping here, for exec to run manual testing"
sleep infinity
volumeMounts:
- name: logs
Expand Down

0 comments on commit 4ffd0b5

Please sign in to comment.