Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

As-a-service, consume topic definitions and act accordingly #7

Closed
wants to merge 24 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fc2ac68
More verbose (error) logging
atamon Oct 25, 2016
0a68698
Add build-target so we can build internally
atamon Oct 25, 2016
100b0a1
Merge remote-tracking branch 'origin/extend-logging'
solsson Nov 27, 2017
6fad336
wip convert to gradle build, Dockerfile is todo
solsson Nov 27, 2017
71212db
Adds dockerfile, with quite ok caching
solsson Nov 27, 2017
e116371
Adds a couple of dependencies we'll need for the service
solsson Nov 27, 2017
21ee78f
We're going to define the topic declaration schema at start
solsson Nov 28, 2017
bf60ed3
Adapted from Schema Registry's topic setup
solsson Nov 28, 2017
b467f5a
wip
solsson Nov 28, 2017
27f8370
Let's use an integration test to experiment with topic setup
solsson Nov 28, 2017
362b6c5
Apache 2.0 license
solsson Nov 28, 2017
eb2920d
I'm using docker-compose locally, with an extra advertised.listeners
solsson Nov 28, 2017
16b2d30
Lists current schemas
solsson Nov 28, 2017
5dc918c
We'll want to get topic declaration messages as their java representa…
solsson Nov 28, 2017
5979eb4
Shows that we have the current schema source to compare with registry…
solsson Nov 28, 2017
8c6e5b4
Uploads our local schema, if it's missing
solsson Nov 28, 2017
a1a000d
Evaluating concurrency and retry alternatives
solsson Nov 30, 2017
eb63aea
Testability! for anyone who has my docker-compose.yml :)
solsson Nov 30, 2017
55439ba
Tests error handling in AdminClient
solsson Dec 1, 2017
1426314
Reuse in tests, gotcha again
solsson Dec 1, 2017
88be59c
Got some sort of structure for integrating different services now
solsson Dec 1, 2017
7903634
Wanted to start, but more importantly stop, the service from a test
solsson Dec 1, 2017
e0057a2
Shares the executor service with the retry lib
solsson Dec 1, 2017
1fb1e0b
Runs in sequence for now, until we're more than a consumer
solsson Dec 1, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Testability! for anyone who has my docker-compose.yml :)
  • Loading branch information
solsson committed Nov 30, 2017

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit eb63aea9c0b81bc2f92a5f9afd95cd66feab83cb
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -36,6 +36,10 @@ dependencies {

compile group: 'io.confluent', name: 'kafka-schema-registry-client', version: '4.0.0'

compile group: 'com.nurkiewicz.asyncretry', name: 'asyncretry', version: '0.0.7'
//compile group: 'com.github.rholder', name: 'guava-retrying', version: '2.0.0'
compile group: 'com.evanlennick', name: 'retry4j', version: '0.7.2'

runtime group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'
runtime group: 'org.slf4j', name: 'log4j-over-slf4j', version: '1.7.25'
compile group: 'structlog4j', name: 'structlog4j-api', version: '1.0.0'
60 changes: 60 additions & 0 deletions src/itest/java/se/yolean/kafka/topic/client/config/ItestProps.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package se.yolean.kafka.topic.client.config;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import com.google.inject.AbstractModule;
import com.google.inject.name.Names;

import io.prometheus.client.CollectorRegistry;
import se.yolean.kafka.topic.client.cli.Client;

public class ItestProps extends AbstractModule {

public static final CollectorRegistry PROMETHEUS_REGISTRY = new CollectorRegistry();

//public static final ItestProps DOCKER_COMPOSE = new ItestProps("itest-dockercompose.properties");
public static final ItestProps DOCKER_COMPOSE = new ItestProps(new File("src/itest/resources/itest-dockercompose.properties"));

private Properties config;

public ItestProps(String itestPropertiesFielnameInClasspathRoot) {
Properties properties = new Properties();
try {
InputStream defaultProperties = Client.class.getResourceAsStream(Client.DEFAULT_PROPERTIES_FILE);
properties.load(defaultProperties);
InputStream itestProperties = this.getClass().getResourceAsStream(itestPropertiesFielnameInClasspathRoot);
properties.load(itestProperties);
} catch (IOException e) {
throw new RuntimeException(e);
}
this.config = properties;
}

public ItestProps(File itestPropertiesFile) {
Properties properties = new Properties();
try {
FileReader defaults = new FileReader(new File("src/main/resources/" + Client.DEFAULT_PROPERTIES_FILE));
properties.load(defaults);
FileReader itest = new FileReader(itestPropertiesFile);
properties.load(itest);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
this.config = properties;
}

@Override
protected void configure() {
System.out.print("Itest props: ");
this.config.list(System.out);
Names.bindProperties(super.binder(), this.config);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package se.yolean.kafka.topic.client.retryable;

import org.apache.kafka.clients.admin.AdminClient;
import org.junit.Test;

import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;

import se.yolean.kafka.topic.client.config.ItestProps;
import se.yolean.kafka.topic.client.service.AdminClientProvider;

public class BrokerProbeIntegrationTest {

@Test
public void test() throws Exception {
Injector conf = Guice.createInjector(ItestProps.DOCKER_COMPOSE, new AbstractModule() {
@Override
protected void configure() {
bind(AdminClient.class).toProvider(AdminClientProvider.class);
}
});
BrokerProbe probe = conf.getInstance(BrokerProbe.class);
probe.call();
}

}
1 change: 1 addition & 0 deletions src/itest/resources/itest-dockercompose.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
bootstrap.servers=PLAINTEXT://localhost:9094
2 changes: 1 addition & 1 deletion src/main/java/se/yolean/kafka/topic/client/cli/Client.java
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@

public class Client {

final static String DEFAULT_PROPERTIES_FILE = "defaults.properties";
public final static String DEFAULT_PROPERTIES_FILE = "default.properties";

final static String topicName = System.getenv("TOPIC_NAME");
final static boolean resetTopic = Boolean.parseBoolean(System.getenv("RESET_TOPIC"));
Original file line number Diff line number Diff line change
@@ -16,9 +16,9 @@

import se.yolean.kafka.topic.client.config.ManagerConfigModule;
import se.yolean.kafka.topic.client.config.MetricsModule;
import se.yolean.kafka.topic.client.retryable.RestProxySetup;
import se.yolean.kafka.topic.client.retryable.SchemaRegistrySetup;
import se.yolean.kafka.topic.client.config.ExecutorRetryProviderForInit;
import se.yolean.kafka.topic.client.tasks.RestProxySetup;
import se.yolean.kafka.topic.client.tasks.SchemaRegistrySetup;

public class ManagedTopicsService {

@@ -35,6 +35,8 @@ public ManagedTopicsService(Properties config) {
MetricsModule.Exporter exporter = initContext.getInstance(MetricsModule.Exporter.class);
log.info("Metrics exporter", "status", exporter.getStatus(), "port", exporter.getHttpPort());



SchemaRegistrySetup shemaRegistry = initContext.getInstance(SchemaRegistrySetup.class);

RestProxySetup restProxy = initContext.getInstance(RestProxySetup.class);

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package se.yolean.kafka.topic.client.retryable;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.inject.Inject;
import javax.inject.Named;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;

import com.github.structlog4j.ILogger;
import com.github.structlog4j.SLoggerFactory;

import io.prometheus.client.Counter;

public class BrokerProbe implements Callable<BrokerProbe.KafkaStatus> {

private final ILogger log = SLoggerFactory.getLogger(this.getClass());

// Should be made configurable, but let's keep them short and work on back-off

static final Counter timeouts = Counter.build().name("timeouts").labelNames("broker_probe")
.help("AdminClient.describeCluster timeouts").register();

@Inject
private AdminClient adminClient;

@Inject
@Named("brokers.describe.timeout")
private int describeTimeoutMs = 1;

@Inject
@Named("brokers.describe.get.timeout")
private int nodesTimeoutMs = 10;

@Inject
@Named("brokers.available.min")
private int brokersAvailableMin;

@Override
public KafkaStatus call() throws Exception {
DescribeClusterOptions options = new DescribeClusterOptions();
options.timeoutMs(describeTimeoutMs);
DescribeClusterResult describe = adminClient.describeCluster(options);

KafkaFuture<Collection<Node>> nodesFuture = describe.nodes();

Collection<Node> nodes = null;
try {
nodes = nodesFuture.get(nodesTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted when waiting for controller status", e);
} catch (ExecutionException e) {
log.error("Execution error for controller status", e);
} catch (TimeoutException e) {
log.warn("Timeout waiting for controller response", "ms", nodesTimeoutMs, e);
timeouts.inc();
}

if (nodes == null) {
throw new Exception("No broker information available");
}
if (nodes.size() < brokersAvailableMin) {
throw new NotEnoughBrokersException(brokersAvailableMin, nodes.size());
}

return new KafkaStatus();
}

public static class KafkaStatus {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package se.yolean.kafka.topic.client.retryable;

public class NotEnoughBrokersException extends Exception {

private static final long serialVersionUID = 1L;

public NotEnoughBrokersException(int expected, int actual) {
super("Got " + actual + " brokers but at least " + expected + " is required");
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package se.yolean.kafka.topic.client.tasks;
package se.yolean.kafka.topic.client.retryable;

import com.github.structlog4j.ILogger;
import com.github.structlog4j.SLoggerFactory;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package se.yolean.kafka.topic.client.tasks;
package se.yolean.kafka.topic.client.retryable;

import com.github.structlog4j.ILogger;
import com.github.structlog4j.SLoggerFactory;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package se.yolean.kafka.topic.client.tasks;
package se.yolean.kafka.topic.client.retryable;

import java.util.concurrent.Callable;

90 changes: 0 additions & 90 deletions src/main/java/se/yolean/kafka/topic/client/tasks/BrokerProbe.java

This file was deleted.

This file was deleted.

This file was deleted.

3 changes: 3 additions & 0 deletions src/main/resources/default.properties
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
prometheus.exporter.port=5000
brokers.available.min=1
brokers.describe.timeout=10
brokers.describe.get.timeout=10