Skip to content

Commit

Permalink
Java Docs & kafka-test module (#35)
Browse files Browse the repository at this point in the history
- Add java docs (requirement for publishing to Maven Central)
- Move `SchemaRegistryContainer` into new `kafka-test` module.
  • Loading branch information
big-andy-coates authored Jan 25, 2023
1 parent db885e2 commit 7bce533
Show file tree
Hide file tree
Showing 30 changed files with 560 additions and 113 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ subprojects {
targetCompatibility = JavaVersion.VERSION_11

withSourcesJar()
withJavadocJar()
}

extra.apply {
Expand Down
23 changes: 23 additions & 0 deletions kafka-test/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
`java-library`
}

val kafkaVersion : String by extra
val spotBugsVersion : String by extra
val jacksonVersion : String by extra
val testcontainersVersion : String by extra
val lombokVersion : String by extra
val confluentVersion : String by extra

dependencies {
implementation(project(":kafka"))

compileOnly("org.projectlombok:lombok:$lombokVersion")
annotationProcessor("org.projectlombok:lombok:$lombokVersion")
testCompileOnly("org.projectlombok:lombok:$lombokVersion")
testAnnotationProcessor("org.projectlombok:lombok:$lombokVersion")

implementation("org.testcontainers:testcontainers:$testcontainersVersion")
implementation("org.testcontainers:kafka:$testcontainersVersion")
testImplementation("org.testcontainers:junit-jupiter:$testcontainersVersion")
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,49 @@
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

/**
* Test container for the Schema Registry
*/
public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {

private static final String SCHEMA_REGISTRY_DOCKER_IMAGE_NAME = "confluentinc/cp-schema-registry:6.0.2";
private static final DockerImageName SCHEMA_REGISTRY_DOCKER_IMAGE = DockerImageName
.parse(SCHEMA_REGISTRY_DOCKER_IMAGE_NAME);

/**
* Port the SR will listen on.
*/
public static final int SCHEMA_REGISTRY_PORT = 8081;

/**
* @param version
* docker image version of schema registry
*/
public SchemaRegistryContainer(final String version) {
super(SCHEMA_REGISTRY_DOCKER_IMAGE.withTag(version));
withExposedPorts(SCHEMA_REGISTRY_PORT);
}

/**
* Link to Kafka container
*
* @param kafka
* kafka container
* @return self.
*/
public SchemaRegistryContainer withKafka(final KafkaContainer kafka) {
return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092");
}

/**
* Link to Network with Kafka
*
* @param network
* the network Kafka is running on
* @param bootstrapServers
* the Kafka bootstrap servers
* @return self.
*/
public SchemaRegistryContainer withKafka(final Network network, final String bootstrapServers) {
withNetwork(network);
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry");
Expand All @@ -30,6 +57,9 @@ public SchemaRegistryContainer withKafka(final Network network, final String boo
return self();
}

/**
* @return Url of the SR.
*/
public String getUrl() {
return "http://" + getHost() + ":" + getMappedPort(SCHEMA_REGISTRY_PORT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
Expand Down Expand Up @@ -92,7 +94,7 @@ void shouldProvisionProduceAndConsumeUsingAvroWithSpeccy() throws Exception {
final ConsumerRecords<Long, UserSignedUp> consumerRecords = consumer.poll(Duration.ofSeconds(10));
assertThat(consumerRecords, is(notNullValue()));
assertThat(consumerRecords.count(), is(1));
assertThat(consumerRecords.iterator().next().value(), is(sentRecord));
MatcherAssert.assertThat(consumerRecords.iterator().next().value(), Matchers.is(sentRecord));
}

@Order(2)
Expand Down Expand Up @@ -126,7 +128,7 @@ void shouldProvisionProduceAndConsumeProtoWithSpeccyClient() throws Exception {
final ConsumerRecords<Long, UserInfo> consumerRecords = consumer.poll(Duration.ofSeconds(10));
assertThat(consumerRecords, is(notNullValue()));
assertThat(consumerRecords.count(), is(1));
assertThat(consumerRecords.iterator().next().value(), is(userSam));
MatcherAssert.assertThat(consumerRecords.iterator().next().value(), Matchers.is(userSam));
}

@Order(3)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package simple.schema_demo._public.user_checkout_value;


import com.fasterxml.jackson.annotation.JsonProperty;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaString;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonSchemaInject(strings = {
@JsonSchemaString(path = "javaType", value = "simple.schema_demo._public.user_checkout_value.UserCheckout")})
public class UserCheckout {
@JsonProperty
long id;
@JsonProperty
String name;
@JsonProperty
int price;
@JsonProperty
String systemDate;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package simple.schema_demo._public.user_signed_up_value;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserSignedUp {
String fullName;
String email;
int age;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"$schema":"http://json-schema.org/draft-07/schema#",
"title":"User Checkout",
"type":"object",
"additionalProperties":false,
"javaType":"simple.schema_demo._public.user_checkout_value.UserCheckout",
"properties":{
"id":{
"type":"integer"
},
"name":{
"oneOf":[
{
"type":"null",
"title":"Not included"
},
{
"type":"string"
}
]
},
"price":{
"type":"integer"
},
"systemDate":{
"oneOf":[
{
"type":"null",
"title":"Not included"
},
{
"type":"string"
}
]
}
},
"required":[
"id",
"price"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "record",
"namespace": "simple.schema_demo._public.user_signed_up_value",
"name": "UserSignedUp",
"fields": [
{"name": "fullName", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": "int"}
]
}
96 changes: 96 additions & 0 deletions kafka/src/main/java/io/specmesh/kafka/Clients.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,54 @@
import org.apache.kafka.streams.StreamsConfig;
import org.jetbrains.annotations.NotNull;

/**
* Factory for Kafka clients
*/
public final class Clients {
private Clients() {
}

/**
* Create a Kafka producer
*
* @param keyClass
* the type of the key
* @param valueClass
* the type of the value
* @param producerProperties
* the properties
* @param <K>
* the type of the key
* @param <V>
* the type of the value
* @return the producer
*/
public static <K, V> KafkaProducer<K, V> producer(final Class<K> keyClass, final Class<V> valueClass,
final Map<String, Object> producerProperties) {
return new KafkaProducer<>(producerProperties);
}

/**
* Create a map of producer properties with sensible defaults.
*
* @param domainId
* the domain id, used to scope resource names.
* @param serviceId
* the name of the service
* @param bootstrapServers
* bootstrap servers config
* @param schemaRegistryUrl
* url of schema registry
* @param keySerializerClass
* type of key serializer
* @param valueSerializerClass
* type of value serializer
* @param acksAll
* require acks from all replicas?
* @param providedProperties
* additional props
* @return props
*/
@SuppressWarnings("checkstyle:ParameterNumber")
@NotNull
public static Map<String, Object> producerProperties(final String domainId, final String serviceId,
Expand All @@ -45,6 +84,27 @@ public static Map<String, Object> producerProperties(final String domainId, fina
providedProperties);
}

/**
* Create props for KStream app with sensible defaults.
*
* @param domainId
* the domain id, used to scope resource names.
* @param serviceId
* the name of the service
* @param bootstrapServers
* bootstrap servers config
* @param schemaRegistryUrl
* url of schema registry
* @param keySerdeClass
* type of key serde
* @param valueSerdeClass
* type of value serde
* @param acksAll
* require acks from all replicas?
* @param providedProperties
* additional props
* @return the streams properties.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
@NotNull
public static Map<String, Object> kstreamsProperties(final String domainId, final String serviceId,
Expand All @@ -69,11 +129,47 @@ public static Map<String, Object> kstreamsProperties(final String domainId, fina
KafkaAvroSerializerConfig.USE_LATEST_VERSION, "true"), providedProperties);
}

/**
* Create a Kafka consumer
*
* @param keyClass
* the type of the key
* @param valueClass
* the type of the value
* @param consumerProperties
* the properties
* @param <K>
* the type of the key
* @param <V>
* the type of the value
* @return the producer
*/
public static <K, V> KafkaConsumer<K, V> consumer(final Class<K> keyClass, final Class<V> valueClass,
final Map<String, Object> consumerProperties) {
return new KafkaConsumer<>(consumerProperties);
}

/**
* Create a map of consumer properties with sensible defaults.
*
* @param domainId
* the domain id, used to scope resource names.
* @param serviceId
* the name of the service
* @param bootstrapServers
* bootstrap servers config
* @param schemaRegistryUrl
* url of schema registry
* @param keyDeserializerClass
* type of key deserializer
* @param valueDeserializerClass
* type of value deserializer
* @param autoOffsetResetEarliest
* reset to earliest offset if no stored offsets?
* @param providedProperties
* additional props
* @return props
*/
@SuppressWarnings("checkstyle:ParameterNumber")
@NotNull
public static Map<String, Object> consumerProperties(final String domainId, final String serviceId,
Expand Down
Loading

0 comments on commit 7bce533

Please sign in to comment.