From 7bce533072c0170baff89feec6f9a4ea3a4f76f1 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Wed, 25 Jan 2023 15:19:22 +0000 Subject: [PATCH] Java Docs & kafka-test module (#35) - Add java docs (requirement for publishing to Maven Central) - Move `SchemaRegistryContainer` into new `kafka-test` module. --- build.gradle.kts | 1 + kafka-test/build.gradle.kts | 23 ++++ .../specmesh/kafka/AbstractContainerTest.java | 0 .../kafka/schema/SchemaRegistryContainer.java | 30 +++++ .../kafka/ClientsFunctionalDemoTest.java | 6 +- .../SimpleSchemaDemoPublicUserInfo.java | 0 ...impleSchemaDemoPublicUserInfoEnriched.java | 103 ++++++++---------- .../user_checkout_value/UserCheckout.java | 25 +++++ .../user_signed_up_value/UserSignedUp.java | 16 +++ ...mple.schema_demo._public.user_checkout.yml | 41 +++++++ ...simple.schema_demo._public.user_info.proto | 0 ...hema_demo._public.user_info_enriched.proto | 0 ...le.schema_demo._public.user_signed_up.avsc | 10 ++ .../resources/simple_schema_demo-api.yaml | 0 .../main/java/io/specmesh/kafka/Clients.java | 96 ++++++++++++++++ .../java/io/specmesh/kafka/KafkaApiSpec.java | 31 +++++- .../java/io/specmesh/kafka/Provisioner.java | 54 +++++++-- .../io/specmesh/kafka/schema/JsonSchemas.java | 28 ++++- .../kafka/schema/RegisteredSchema.java | 4 + .../kafka/schema/SrSchemaManager.java | 42 +++++++ .../io/specmesh/apiparser/AsyncApiParser.java | 12 ++ .../io/specmesh/apiparser/model/ApiSpec.java | 17 ++- .../io/specmesh/apiparser/model/Bindings.java | 3 + .../io/specmesh/apiparser/model/Channel.java | 3 + .../apiparser/model/KafkaBinding.java | 44 +++++--- .../io/specmesh/apiparser/model/Message.java | 37 ++++--- .../specmesh/apiparser/model/Operation.java | 25 +++-- .../specmesh/apiparser/model/SchemaInfo.java | 17 +++ .../java/io/specmesh/apiparser/model/Tag.java | 3 + settings.gradle.kts | 2 +- 30 files changed, 560 insertions(+), 113 deletions(-) create mode 100644 kafka-test/build.gradle.kts rename {kafka/src/test => kafka-test/src/main}/java/io/specmesh/kafka/AbstractContainerTest.java (100%) rename {kafka/src/test => kafka-test/src/main}/java/io/specmesh/kafka/schema/SchemaRegistryContainer.java (71%) rename {kafka => kafka-test}/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java (97%) rename {kafka => kafka-test}/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfo.java (100%) rename {kafka => kafka-test}/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfoEnriched.java (85%) create mode 100644 kafka-test/src/test/java/simple/schema_demo/_public/user_checkout_value/UserCheckout.java create mode 100644 kafka-test/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java create mode 100644 kafka-test/src/test/resources/schema/simple.schema_demo._public.user_checkout.yml rename {kafka => kafka-test}/src/test/resources/schema/simple.schema_demo._public.user_info.proto (100%) rename {kafka => kafka-test}/src/test/resources/schema/simple.schema_demo._public.user_info_enriched.proto (100%) create mode 100644 kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc rename {kafka => kafka-test}/src/test/resources/simple_schema_demo-api.yaml (100%) diff --git a/build.gradle.kts b/build.gradle.kts index ad0fafdc..82d10c5a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -45,6 +45,7 @@ subprojects { targetCompatibility = JavaVersion.VERSION_11 withSourcesJar() + withJavadocJar() } extra.apply { diff --git a/kafka-test/build.gradle.kts b/kafka-test/build.gradle.kts new file mode 100644 index 00000000..4c602df4 --- /dev/null +++ b/kafka-test/build.gradle.kts @@ -0,0 +1,23 @@ +plugins { + `java-library` +} + +val kafkaVersion : String by extra +val spotBugsVersion : String by extra +val jacksonVersion : String by extra +val testcontainersVersion : String by extra +val lombokVersion : String by extra +val confluentVersion : String by extra + +dependencies { + implementation(project(":kafka")) + + compileOnly("org.projectlombok:lombok:$lombokVersion") + annotationProcessor("org.projectlombok:lombok:$lombokVersion") + testCompileOnly("org.projectlombok:lombok:$lombokVersion") + testAnnotationProcessor("org.projectlombok:lombok:$lombokVersion") + + implementation("org.testcontainers:testcontainers:$testcontainersVersion") + implementation("org.testcontainers:kafka:$testcontainersVersion") + testImplementation("org.testcontainers:junit-jupiter:$testcontainersVersion") +} \ No newline at end of file diff --git a/kafka/src/test/java/io/specmesh/kafka/AbstractContainerTest.java b/kafka-test/src/main/java/io/specmesh/kafka/AbstractContainerTest.java similarity index 100% rename from kafka/src/test/java/io/specmesh/kafka/AbstractContainerTest.java rename to kafka-test/src/main/java/io/specmesh/kafka/AbstractContainerTest.java diff --git a/kafka/src/test/java/io/specmesh/kafka/schema/SchemaRegistryContainer.java b/kafka-test/src/main/java/io/specmesh/kafka/schema/SchemaRegistryContainer.java similarity index 71% rename from kafka/src/test/java/io/specmesh/kafka/schema/SchemaRegistryContainer.java rename to kafka-test/src/main/java/io/specmesh/kafka/schema/SchemaRegistryContainer.java index b5171377..8e41be82 100644 --- a/kafka/src/test/java/io/specmesh/kafka/schema/SchemaRegistryContainer.java +++ b/kafka-test/src/main/java/io/specmesh/kafka/schema/SchemaRegistryContainer.java @@ -6,22 +6,49 @@ import org.testcontainers.containers.Network; import org.testcontainers.utility.DockerImageName; +/** + * Test container for the Schema Registry + */ public class SchemaRegistryContainer extends GenericContainer { private static final String SCHEMA_REGISTRY_DOCKER_IMAGE_NAME = "confluentinc/cp-schema-registry:6.0.2"; private static final DockerImageName SCHEMA_REGISTRY_DOCKER_IMAGE = DockerImageName .parse(SCHEMA_REGISTRY_DOCKER_IMAGE_NAME); + + /** + * Port the SR will listen on. + */ public static final int SCHEMA_REGISTRY_PORT = 8081; + /** + * @param version + * docker image version of schema registry + */ public SchemaRegistryContainer(final String version) { super(SCHEMA_REGISTRY_DOCKER_IMAGE.withTag(version)); withExposedPorts(SCHEMA_REGISTRY_PORT); } + /** + * Link to Kafka container + * + * @param kafka + * kafka container + * @return self. + */ public SchemaRegistryContainer withKafka(final KafkaContainer kafka) { return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092"); } + /** + * Link to Network with Kafka + * + * @param network + * the network Kafka is running on + * @param bootstrapServers + * the Kafka bootstrap servers + * @return self. + */ public SchemaRegistryContainer withKafka(final Network network, final String bootstrapServers) { withNetwork(network); withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry"); @@ -30,6 +57,9 @@ public SchemaRegistryContainer withKafka(final Network network, final String boo return self(); } + /** + * @return Url of the SR. + */ public String getUrl() { return "http://" + getHost() + ":" + getMappedPort(SCHEMA_REGISTRY_PORT); } diff --git a/kafka/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java b/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java similarity index 97% rename from kafka/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java rename to kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java index e7ad06d8..d9064bd2 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java +++ b/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java @@ -42,6 +42,8 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; @@ -92,7 +94,7 @@ void shouldProvisionProduceAndConsumeUsingAvroWithSpeccy() throws Exception { final ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(10)); assertThat(consumerRecords, is(notNullValue())); assertThat(consumerRecords.count(), is(1)); - assertThat(consumerRecords.iterator().next().value(), is(sentRecord)); + MatcherAssert.assertThat(consumerRecords.iterator().next().value(), Matchers.is(sentRecord)); } @Order(2) @@ -126,7 +128,7 @@ void shouldProvisionProduceAndConsumeProtoWithSpeccyClient() throws Exception { final ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(10)); assertThat(consumerRecords, is(notNullValue())); assertThat(consumerRecords.count(), is(1)); - assertThat(consumerRecords.iterator().next().value(), is(userSam)); + MatcherAssert.assertThat(consumerRecords.iterator().next().value(), Matchers.is(userSam)); } @Order(3) diff --git a/kafka/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfo.java b/kafka-test/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfo.java similarity index 100% rename from kafka/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfo.java rename to kafka-test/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfo.java diff --git a/kafka/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfoEnriched.java b/kafka-test/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfoEnriched.java similarity index 85% rename from kafka/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfoEnriched.java rename to kafka-test/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfoEnriched.java index 6823eff8..d5a386e8 100644 --- a/kafka/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfoEnriched.java +++ b/kafka-test/src/test/java/io/specmesh/kafka/schema/SimpleSchemaDemoPublicUserInfoEnriched.java @@ -97,15 +97,14 @@ public final com.google.protobuf.UnknownFieldSet getUnknownFields() { return this.unknownFields; } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_descriptor; + return SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_descriptor; } @java.lang.Override protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { - return io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_fieldAccessorTable - .ensureFieldAccessorsInitialized( - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.class, - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.Builder.class); + return SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_fieldAccessorTable + .ensureFieldAccessorsInitialized(SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.class, + SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.Builder.class); } public static final int FULLNAME_FIELD_NUMBER = 1; @@ -290,10 +289,10 @@ public boolean equals(final java.lang.Object obj) { if (obj == this) { return true; } - if (!(obj instanceof io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched)) { + if (!(obj instanceof SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched)) { return super.equals(obj); } - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched other = (io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched) obj; + SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched other = (SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched) obj; if (!getFullName().equals(other.getFullName())) return false; @@ -328,57 +327,56 @@ public int hashCode() { return hash; } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( - java.nio.ByteBuffer data) throws com.google.protobuf.InvalidProtocolBufferException { + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom(java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( - java.nio.ByteBuffer data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom(java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( - byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( - byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom(byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( - java.io.InputStream input) throws java.io.IOException { + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom(java.io.InputStream input) + throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( - java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom(java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input, extensionRegistry); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseDelimitedFrom( + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseDelimitedFrom( java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3.parseDelimitedWithIOException(PARSER, input); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseDelimitedFrom( + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3.parseDelimitedWithIOException(PARSER, input, extensionRegistry); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input, extensionRegistry); @@ -391,8 +389,7 @@ public Builder newBuilderForType() { public static Builder newBuilder() { return DEFAULT_INSTANCE.toBuilder(); } - public static Builder newBuilder( - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched prototype) { + public static Builder newBuilder(SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched prototype) { return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); } @java.lang.Override @@ -411,21 +408,20 @@ protected Builder newBuilderForType(com.google.protobuf.GeneratedMessageV3.Build public static final class Builder extends com.google.protobuf.GeneratedMessageV3.Builder implements // @@protoc_insertion_point(builder_implements:io.specmesh.kafka.schema.UserInfoEnriched) - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnrichedOrBuilder { + SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnrichedOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_descriptor; + return SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_descriptor; } @java.lang.Override protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { - return io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_fieldAccessorTable - .ensureFieldAccessorsInitialized( - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.class, - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.Builder.class); + return SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_fieldAccessorTable + .ensureFieldAccessorsInitialized(SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.class, + SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.Builder.class); } // Construct using - // io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.newBuilder() + // io.specmesh.kafka.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.newBuilder() private Builder() { } @@ -447,18 +443,17 @@ public Builder clear() { @java.lang.Override public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_descriptor; + return SimpleSchemaDemoPublicUserInfoEnriched.internal_static_io_specmesh_kafka_schema_UserInfoEnriched_descriptor; } @java.lang.Override - public io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched getDefaultInstanceForType() { - return io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched - .getDefaultInstance(); + public SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched getDefaultInstanceForType() { + return SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.getDefaultInstance(); } @java.lang.Override - public io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched build() { - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched result = buildPartial(); + public SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched build() { + SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched result = buildPartial(); if (!result.isInitialized()) { throw newUninitializedMessageException(result); } @@ -466,8 +461,8 @@ public io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoE } @java.lang.Override - public io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched buildPartial() { - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched result = new io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched( + public SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched buildPartial() { + SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched result = new SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched( this); if (bitField0_ != 0) { buildPartial0(result); @@ -476,8 +471,7 @@ public io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoE return result; } - private void buildPartial0( - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched result) { + private void buildPartial0(SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched result) { int from_bitField0_ = bitField0_; if (((from_bitField0_ & 0x00000001) != 0)) { result.fullName_ = fullName_; @@ -521,19 +515,16 @@ public Builder addRepeatedField(com.google.protobuf.Descriptors.FieldDescriptor } @java.lang.Override public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched) { - return mergeFrom( - (io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched) other); + if (other instanceof SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched) { + return mergeFrom((SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched) other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom( - io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched other) { - if (other == io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched - .getDefaultInstance()) + public Builder mergeFrom(SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched other) { + if (other == SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched.getDefaultInstance()) return this; if (!other.getFullName().isEmpty()) { fullName_ = other.fullName_; @@ -897,12 +888,12 @@ public final Builder mergeUnknownFields(final com.google.protobuf.UnknownFieldSe } // @@protoc_insertion_point(class_scope:io.specmesh.kafka.schema.UserInfoEnriched) - private static final io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched DEFAULT_INSTANCE; + private static final SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched DEFAULT_INSTANCE; static { - DEFAULT_INSTANCE = new io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched(); + DEFAULT_INSTANCE = new SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched(); } - public static io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched getDefaultInstance() { + public static SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched getDefaultInstance() { return DEFAULT_INSTANCE; } @@ -936,7 +927,7 @@ public com.google.protobuf.Parser getParserForType() { } @java.lang.Override - public io.specmesh.kafka.schema.SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched getDefaultInstanceForType() { + public SimpleSchemaDemoPublicUserInfoEnriched.UserInfoEnriched getDefaultInstanceForType() { return DEFAULT_INSTANCE; } diff --git a/kafka-test/src/test/java/simple/schema_demo/_public/user_checkout_value/UserCheckout.java b/kafka-test/src/test/java/simple/schema_demo/_public/user_checkout_value/UserCheckout.java new file mode 100644 index 00000000..b3556548 --- /dev/null +++ b/kafka-test/src/test/java/simple/schema_demo/_public/user_checkout_value/UserCheckout.java @@ -0,0 +1,25 @@ +package simple.schema_demo._public.user_checkout_value; + + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject; +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaString; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@JsonSchemaInject(strings = { + @JsonSchemaString(path = "javaType", value = "simple.schema_demo._public.user_checkout_value.UserCheckout")}) +public class UserCheckout { + @JsonProperty + long id; + @JsonProperty + String name; + @JsonProperty + int price; + @JsonProperty + String systemDate; +} diff --git a/kafka-test/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java b/kafka-test/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java new file mode 100644 index 00000000..fdb3b401 --- /dev/null +++ b/kafka-test/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java @@ -0,0 +1,16 @@ +package simple.schema_demo._public.user_signed_up_value; + + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class UserSignedUp { + String fullName; + String email; + int age; + +} diff --git a/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_checkout.yml b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_checkout.yml new file mode 100644 index 00000000..38e279ed --- /dev/null +++ b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_checkout.yml @@ -0,0 +1,41 @@ +{ + "$schema":"http://json-schema.org/draft-07/schema#", + "title":"User Checkout", + "type":"object", + "additionalProperties":false, + "javaType":"simple.schema_demo._public.user_checkout_value.UserCheckout", + "properties":{ + "id":{ + "type":"integer" + }, + "name":{ + "oneOf":[ + { + "type":"null", + "title":"Not included" + }, + { + "type":"string" + } + ] + }, + "price":{ + "type":"integer" + }, + "systemDate":{ + "oneOf":[ + { + "type":"null", + "title":"Not included" + }, + { + "type":"string" + } + ] + } + }, + "required":[ + "id", + "price" + ] +} \ No newline at end of file diff --git a/kafka/src/test/resources/schema/simple.schema_demo._public.user_info.proto b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_info.proto similarity index 100% rename from kafka/src/test/resources/schema/simple.schema_demo._public.user_info.proto rename to kafka-test/src/test/resources/schema/simple.schema_demo._public.user_info.proto diff --git a/kafka/src/test/resources/schema/simple.schema_demo._public.user_info_enriched.proto b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_info_enriched.proto similarity index 100% rename from kafka/src/test/resources/schema/simple.schema_demo._public.user_info_enriched.proto rename to kafka-test/src/test/resources/schema/simple.schema_demo._public.user_info_enriched.proto diff --git a/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc new file mode 100644 index 00000000..73361bfc --- /dev/null +++ b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc @@ -0,0 +1,10 @@ +{ + "type": "record", + "namespace": "simple.schema_demo._public.user_signed_up_value", + "name": "UserSignedUp", + "fields": [ + {"name": "fullName", "type": "string"}, + {"name": "email", "type": "string"}, + {"name": "age", "type": "int"} + ] +} \ No newline at end of file diff --git a/kafka/src/test/resources/simple_schema_demo-api.yaml b/kafka-test/src/test/resources/simple_schema_demo-api.yaml similarity index 100% rename from kafka/src/test/resources/simple_schema_demo-api.yaml rename to kafka-test/src/test/resources/simple_schema_demo-api.yaml diff --git a/kafka/src/main/java/io/specmesh/kafka/Clients.java b/kafka/src/main/java/io/specmesh/kafka/Clients.java index 8ba682cf..8f9d9d3c 100644 --- a/kafka/src/main/java/io/specmesh/kafka/Clients.java +++ b/kafka/src/main/java/io/specmesh/kafka/Clients.java @@ -15,15 +15,54 @@ import org.apache.kafka.streams.StreamsConfig; import org.jetbrains.annotations.NotNull; +/** + * Factory for Kafka clients + */ public final class Clients { private Clients() { } + /** + * Create a Kafka producer + * + * @param keyClass + * the type of the key + * @param valueClass + * the type of the value + * @param producerProperties + * the properties + * @param + * the type of the key + * @param + * the type of the value + * @return the producer + */ public static KafkaProducer producer(final Class keyClass, final Class valueClass, final Map producerProperties) { return new KafkaProducer<>(producerProperties); } + /** + * Create a map of producer properties with sensible defaults. + * + * @param domainId + * the domain id, used to scope resource names. + * @param serviceId + * the name of the service + * @param bootstrapServers + * bootstrap servers config + * @param schemaRegistryUrl + * url of schema registry + * @param keySerializerClass + * type of key serializer + * @param valueSerializerClass + * type of value serializer + * @param acksAll + * require acks from all replicas? + * @param providedProperties + * additional props + * @return props + */ @SuppressWarnings("checkstyle:ParameterNumber") @NotNull public static Map producerProperties(final String domainId, final String serviceId, @@ -45,6 +84,27 @@ public static Map producerProperties(final String domainId, fina providedProperties); } + /** + * Create props for KStream app with sensible defaults. + * + * @param domainId + * the domain id, used to scope resource names. + * @param serviceId + * the name of the service + * @param bootstrapServers + * bootstrap servers config + * @param schemaRegistryUrl + * url of schema registry + * @param keySerdeClass + * type of key serde + * @param valueSerdeClass + * type of value serde + * @param acksAll + * require acks from all replicas? + * @param providedProperties + * additional props + * @return the streams properties. + */ @SuppressWarnings("checkstyle:ParameterNumber") @NotNull public static Map kstreamsProperties(final String domainId, final String serviceId, @@ -69,11 +129,47 @@ public static Map kstreamsProperties(final String domainId, fina KafkaAvroSerializerConfig.USE_LATEST_VERSION, "true"), providedProperties); } + /** + * Create a Kafka consumer + * + * @param keyClass + * the type of the key + * @param valueClass + * the type of the value + * @param consumerProperties + * the properties + * @param + * the type of the key + * @param + * the type of the value + * @return the producer + */ public static KafkaConsumer consumer(final Class keyClass, final Class valueClass, final Map consumerProperties) { return new KafkaConsumer<>(consumerProperties); } + /** + * Create a map of consumer properties with sensible defaults. + * + * @param domainId + * the domain id, used to scope resource names. + * @param serviceId + * the name of the service + * @param bootstrapServers + * bootstrap servers config + * @param schemaRegistryUrl + * url of schema registry + * @param keyDeserializerClass + * type of key deserializer + * @param valueDeserializerClass + * type of value deserializer + * @param autoOffsetResetEarliest + * reset to earliest offset if no stored offsets? + * @param providedProperties + * additional props + * @return props + */ @SuppressWarnings("checkstyle:ParameterNumber") @NotNull public static Map consumerProperties(final String domainId, final String serviceId, diff --git a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java index 45f73fbb..8d57b749 100644 --- a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java +++ b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java @@ -22,10 +22,17 @@ public class KafkaApiSpec { private final ApiSpec apiSpec; + /** + * @param apiSpec + * the api spec + */ public KafkaApiSpec(final ApiSpec apiSpec) { this.apiSpec = apiSpec; } + /** + * @return the id of the spec + */ public String id() { return apiSpec.id(); } @@ -33,6 +40,8 @@ public String id() { /** * Used by AdminClient.createTopics (includes support for configs overrides i.e. * min.insync.replicas + * + * @return the owned topics. */ public List listDomainOwnedTopics() { @@ -60,6 +69,8 @@ private void validateTopicConfig() { /** * Create an ACL for the domain-id principle that allows writing to any topic * prefixed with the Id Prevent non ACL'd ones from writing to it (somehow) + * + * @return Acl bindings for owned topics */ public List listACLsForDomainOwnedTopics() { validateTopicConfig(); @@ -102,10 +113,14 @@ public List listACLsForDomainOwnedTopics() { AclPermissionType.ALLOW)))); return protectedAccessAcls; } - public static String formatPrinciple(final String domainId) { - return "User:domain-" + domainId; - } + /** + * Get schema info for the supplied {@code topicName} + * + * @param topicName + * the name of the topic + * @return the schema info. + */ public SchemaInfo schemaInfoForTopic(final String topicName) { final List myTopics = listDomainOwnedTopics(); @@ -114,4 +129,14 @@ public SchemaInfo schemaInfoForTopic(final String topicName) { return apiSpec.channels().get(topicName).publish().schemaInfo(); } + + /** + * + * @param domainId + * the domain id + * @return the principle + */ + public static String formatPrinciple(final String domainId) { + return "User:domain-" + domainId; + } } diff --git a/kafka/src/main/java/io/specmesh/kafka/Provisioner.java b/kafka/src/main/java/io/specmesh/kafka/Provisioner.java index 62c745e8..6dfef6e8 100644 --- a/kafka/src/main/java/io/specmesh/kafka/Provisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/Provisioner.java @@ -18,30 +18,55 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicListing; +/** + * Provisions Kafka and SR resources + */ public final class Provisioner { + + private static final int REQUEST_TIMEOUT = 10; + private Provisioner() { } - public static final int WAIT = 10; - + /** + * Provision topics in the Kafka cluster. + * + * @param adminClient + * admin client for the Kafka cluster. + * @param apiSpec + * the api spec. + * @return number of topics created + * @throws InterruptedException + * on interrupt + * @throws ExecutionException + * on remote API call failure + * @throws TimeoutException + * on timeout + */ public static int provisionTopics(final AdminClient adminClient, final KafkaApiSpec apiSpec) throws InterruptedException, ExecutionException, TimeoutException { final var domainTopics = apiSpec.listDomainOwnedTopics(); - final var existingTopics = adminClient.listTopics().listings().get(WAIT, TimeUnit.SECONDS).stream() + final var existingTopics = adminClient.listTopics().listings().get(REQUEST_TIMEOUT, TimeUnit.SECONDS).stream() .map(TopicListing::name).collect(Collectors.toList()); final var newTopicsToCreate = domainTopics.stream() .filter(newTopic -> !existingTopics.contains(newTopic.name())).collect(Collectors.toList()); - adminClient.createTopics(newTopicsToCreate).all().get(WAIT, TimeUnit.SECONDS); + adminClient.createTopics(newTopicsToCreate).all().get(REQUEST_TIMEOUT, TimeUnit.SECONDS); return newTopicsToCreate.size(); } /** - * Still need to - add schema compatibility checks - add schema meta data - * requirement so crappy schemas cannot be published + * Provision schemas to Schema Registry + * + * @param apiSpec + * the api spec + * @param schemaRegistryClient + * the client for the schema registry + * @param baseResourcePath + * the path under which external schemas are stored. */ public static void provisionSchemas(final KafkaApiSpec apiSpec, final SchemaRegistryClient schemaRegistryClient, final String baseResourcePath) { @@ -69,9 +94,23 @@ public static void provisionSchemas(final KafkaApiSpec apiSpec, final SchemaRegi })); } + /** + * Provision acls in the Kafka cluster + * + * @param adminClient + * th admin client for the cluster. + * @param apiSpec + * the api spec. + * @throws InterruptedException + * on interrupt + * @throws ExecutionException + * on remote API call failure + * @throws TimeoutException + * on timeout + */ public static void provisionAcls(final AdminClient adminClient, final KafkaApiSpec apiSpec) throws ExecutionException, InterruptedException, TimeoutException { - adminClient.createAcls(apiSpec.listACLsForDomainOwnedTopics()).all().get(WAIT, TimeUnit.SECONDS); + adminClient.createAcls(apiSpec.listACLsForDomainOwnedTopics()).all().get(REQUEST_TIMEOUT, TimeUnit.SECONDS); } static ParsedSchema getSchema(final String topicName, final String schemaRef, final String path, @@ -88,5 +127,4 @@ static ParsedSchema getSchema(final String topicName, final String schemaRef, fi } throw new RuntimeException("Failed to handle topic:" + topicName + " schema: " + path); } - } diff --git a/kafka/src/main/java/io/specmesh/kafka/schema/JsonSchemas.java b/kafka/src/main/java/io/specmesh/kafka/schema/JsonSchemas.java index c89da9a9..091822fd 100644 --- a/kafka/src/main/java/io/specmesh/kafka/schema/JsonSchemas.java +++ b/kafka/src/main/java/io/specmesh/kafka/schema/JsonSchemas.java @@ -14,9 +14,12 @@ import java.nio.file.Paths; import org.apache.commons.io.IOUtils; +/** + * Util class for working with Json schemas. + */ public final class JsonSchemas { - public static final Path SCHEMA_DIRECTORY = Paths.get("schema"); + private static final Path SCHEMA_DIRECTORY = Paths.get("schema"); private static final ObjectMapper yamlMapper = new ObjectMapper( new YAMLFactory().enable(YAMLGenerator.Feature.MINIMIZE_QUOTES)); @@ -25,10 +28,24 @@ public final class JsonSchemas { private JsonSchemas() { } + /** + * Load a schema from the classpath. + * + * @param schemaFile + * the path to the schema. + * @return the schema. + */ public static JsonSchema loadFromClasspath(final String schemaFile) { return loadFromClasspath(Paths.get(schemaFile)); } + /** + * Load a schema from the classpath. + * + * @param schemaFile + * the path to the schema. + * @return the schema. + */ public static JsonSchema loadFromClasspath(final Path schemaFile) { final String path = File.separator + SCHEMA_DIRECTORY.resolve(schemaFile); final URL resource = JsonSchemas.class.getResource(path); @@ -48,6 +65,15 @@ static String loadYamlFromUrl(final URL resource) throws IOException { return new String(IOUtils.toByteArray(resource), StandardCharsets.UTF_8); } + /** + * Convert a YAML into JSON. + * + * @param yaml + * the YAML to convert + * @return the JSON + * @throws IOException + * on invalid YAML + */ public static String yamlToJson(final String yaml) throws IOException { final Object obj = yamlMapper.readValue(yaml, Object.class); return jsonWriter.writeValueAsString(obj); diff --git a/kafka/src/main/java/io/specmesh/kafka/schema/RegisteredSchema.java b/kafka/src/main/java/io/specmesh/kafka/schema/RegisteredSchema.java index 70cb205e..57984eff 100644 --- a/kafka/src/main/java/io/specmesh/kafka/schema/RegisteredSchema.java +++ b/kafka/src/main/java/io/specmesh/kafka/schema/RegisteredSchema.java @@ -10,6 +10,10 @@ import lombok.Value; import lombok.experimental.Accessors; +/** + * Pojo for holding data about a schema that is registered with the schema + * registry. + */ @Value @Accessors(fluent = true) @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/kafka/src/main/java/io/specmesh/kafka/schema/SrSchemaManager.java b/kafka/src/main/java/io/specmesh/kafka/schema/SrSchemaManager.java index 7bcedba2..689d9055 100644 --- a/kafka/src/main/java/io/specmesh/kafka/schema/SrSchemaManager.java +++ b/kafka/src/main/java/io/specmesh/kafka/schema/SrSchemaManager.java @@ -6,13 +6,30 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.json.JsonSchema; import java.nio.file.Path; + +/** + * Wrapper around the {@link SchemaRegistryClient} + */ public class SrSchemaManager { private final SchemaRegistryClient schemaRegistry; + /** + * @param schemaRegistry + * the client + */ public SrSchemaManager(final SchemaRegistryClient schemaRegistry) { this.schemaRegistry = requireNonNull(schemaRegistry, "schemaRegistry"); } + /** + * Load a schema from the classpath into the local schema cache. + * + * @param schemaFile + * the path to the schema file + * @param subject + * the subject under which to store it + * @return the registered schema + */ public RegisteredSchema loadFromClasspath(final Path schemaFile, final String subject) { final JsonSchema jsonSchema = JsonSchemas.loadFromClasspath(schemaFile); @@ -24,6 +41,13 @@ public RegisteredSchema loadFromClasspath(final Path schemaFile, final String su } } + /** + * Load the latest version of a schema into the local schema cache. + * + * @param subject + * the schema subject to load. + * @return the registered schema + */ public RegisteredSchema loadLatest(final String subject) { try { final SchemaMetadata latestSchemaMetadata = schemaRegistry.getLatestSchemaMetadata(subject); @@ -34,6 +58,15 @@ public RegisteredSchema loadLatest(final String subject) { } } + /** + * Load a specific version of a schema into the local schema cache. + * + * @param subject + * the schema subject + * @param schemaId + * the unique schema id. + * @return the registered schema. + */ public RegisteredSchema loadById(final String subject, final int schemaId) { try { return new RegisteredSchema(subject, schemaRegistry.getSchemaById(schemaId), schemaId); @@ -42,6 +75,15 @@ public RegisteredSchema loadById(final String subject, final int schemaId) { } } + /** + * Register a schema with the schema registry from a file in the classpath. + * + * @param schemaFile + * the path to the schema file + * @param subject + * the subject under which to register it + * @return the registered schema. + */ public RegisteredSchema registerFromClasspath(final Path schemaFile, final String subject) { final JsonSchema schema = JsonSchemas.loadFromClasspath(schemaFile); diff --git a/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java b/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java index 041bc481..1f70fe92 100644 --- a/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java +++ b/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java @@ -7,8 +7,20 @@ import java.io.IOException; import java.io.InputStream; +/** + * The parser + */ public class AsyncApiParser { + /** + * Parse an {@link ApiSpec} from the supplied {@code inputStream}. + * + * @param inputStream + * stream containing the spec. + * @return the api spec + * @throws IOException + * on error + */ public final ApiSpec loadResource(final InputStream inputStream) throws IOException { if (inputStream == null || inputStream.available() == 0) { throw new RuntimeException("Not found"); diff --git a/parser/src/main/java/io/specmesh/apiparser/model/ApiSpec.java b/parser/src/main/java/io/specmesh/apiparser/model/ApiSpec.java index 6eab451c..419c819c 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/ApiSpec.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/ApiSpec.java @@ -12,6 +12,9 @@ import lombok.Value; import lombok.experimental.Accessors; +/** + * Pojo representing the api spec + */ @Value @Accessors(fluent = true) @JsonIgnoreProperties(ignoreUnknown = true) @@ -20,17 +23,20 @@ public class ApiSpec { private static final char DELIMITER = System.getProperty("DELIMITER", ".").charAt(0); @JsonProperty - String id; + private String id; @JsonProperty - String version; + private String version; @JsonProperty - String asyncapi; + private String asyncapi; @JsonProperty - Map channels; + private Map channels; + /** + * @return unique spec id + */ public String id() { return validate(id).substring("urn:".length()).replace(":", DELIMITER + ""); } @@ -42,6 +48,9 @@ private String validate(final String id) { return id; } + /** + * @return channels + */ public Map channels() { return channels.entrySet().stream().collect(Collectors.toMap(e -> getCanonical(id(), e.getKey()), e -> e.getValue(), (k, v) -> k, LinkedHashMap::new)); diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java b/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java index 6f472386..50c9a77d 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java @@ -8,6 +8,9 @@ import lombok.Value; import lombok.experimental.Accessors; +/** + * Pojo representing a binding + */ @Value @Accessors(fluent = true) @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Channel.java b/parser/src/main/java/io/specmesh/apiparser/model/Channel.java index 147d29a9..4a527207 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Channel.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Channel.java @@ -8,6 +8,9 @@ import lombok.Value; import lombok.experimental.Accessors; +/** + * Pojo representing a channel + */ @Value @Accessors(fluent = true) @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java b/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java index 2c9d014a..d23acb26 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java @@ -12,57 +12,63 @@ import lombok.Value; import lombok.experimental.Accessors; -/** +/* * Note: Lombok has a Value/Defaults bug which means we have messy accessors to * cope with default values: See - ... * - ... */ +/** + * Pojo representing a Kafka binding + */ @Value @Accessors(fluent = true) @JsonIgnoreProperties(ignoreUnknown = true) @NoArgsConstructor(force = true) // , access = AccessLevel.PRIVATE) @SuppressFBWarnings public class KafkaBinding { - public static final int DAYS_TO_MS = 24 * 60 * 60 * 1000; - public static final String RETENTION_MS = "retention.ms"; + private static final int DAYS_TO_MS = 24 * 60 * 60 * 1000; + private static final String RETENTION_MS = "retention.ms"; @SuppressWarnings("unchecked") @JsonProperty - List envs = Collections.EMPTY_LIST; + private List envs = Collections.EMPTY_LIST; @JsonProperty - int partitions; + private int partitions; @JsonProperty - int replicas; + private int replicas; @JsonProperty - int retention; + private int retention; @JsonProperty - Map configs = Collections.emptyMap(); + private Map configs = Collections.emptyMap(); @JsonProperty - String groupId; + private String groupId; @JsonProperty - String schemaIdLocation; + private String schemaIdLocation; /** * confluent (payload) / apicurio-legacy / apicurio-new / ibm event-streams * serde (header) */ @JsonProperty - String schemaIdPayloadEncoding; + private String schemaIdPayloadEncoding; @JsonProperty - String schemaLookupStrategy; + private String schemaLookupStrategy; @JsonProperty - String bindingVersion; + private String bindingVersion; + /** + * @return configs + */ public Map configs() { final Map results = new LinkedHashMap<>(configs); @@ -72,14 +78,26 @@ public Map configs() { return results; } + /** + * + * @return number of topic partitions + */ public int partitions() { return partitions == 0 ? 1 : partitions; } + /** + * + * @return number of topic replicas + */ public int replicas() { return replicas == 0 ? 1 : replicas; } + /** + * + * @return message retention in ms. + */ public int retention() { return retention == 0 ? 1 : retention; } diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Message.java b/parser/src/main/java/io/specmesh/apiparser/model/Message.java index 0e46218f..59ff5ae7 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Message.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Message.java @@ -13,8 +13,11 @@ import lombok.experimental.Accessors; /** - * ... + * Pojo representing a Message. + * + * @see spec + * docs */ @Value @Accessors(fluent = true) @@ -24,46 +27,48 @@ @SuppressWarnings({"unchecked", "rawtypes"}) public class Message { @JsonProperty - String messageId; + private String messageId; @JsonProperty - Map headers = Collections.EMPTY_MAP; + private Map headers = Collections.EMPTY_MAP; @JsonProperty - Map payload = Collections.EMPTY_MAP; + private Map payload = Collections.EMPTY_MAP; @JsonProperty - Map correlationId = Collections.EMPTY_MAP; + private Map correlationId = Collections.EMPTY_MAP; @JsonProperty - String schemaFormat; + private String schemaFormat; @JsonProperty - String contentType; + private String contentType; @JsonProperty - String name; + private String name; @JsonProperty - String title; + private String title; @JsonProperty - String summary; + private String summary; @JsonProperty - String description; + private String description; @JsonProperty - List tags = Collections.EMPTY_LIST; + private List tags = Collections.EMPTY_LIST; @JsonProperty - Bindings bindings; + private Bindings bindings; @JsonProperty - Map traits = Collections.EMPTY_MAP; + private Map traits = Collections.EMPTY_MAP; + /** + * @return the location of the schema + */ public String schemaRef() { return (String) payload.get("$ref"); } - } diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java index e5b93d08..214ec573 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java @@ -13,8 +13,12 @@ import lombok.experimental.Accessors; /** - * https://www.asyncapi.com/docs/reference/specification/v2.4.0#operationObject - * https://www.asyncapi.com/docs/reference/specification/v2.4.0#messageTraitObject + * Pojo representing a Message. + * + * @see operationObject + * @see messageTraitObject */ @Value @Accessors(fluent = true) @@ -24,28 +28,31 @@ @SuppressWarnings({"unchecked", "rawtypes"}) public class Operation { @JsonProperty - String operationId; + private String operationId; @JsonProperty - String summary; + private String summary; @JsonProperty - String description; + private String description; @SuppressWarnings({"rawtypes", "unchecked"}) @JsonProperty - List tags = Collections.EMPTY_LIST; + private List tags = Collections.EMPTY_LIST; @JsonProperty - Bindings bindings; + private Bindings bindings; // https://www.asyncapi.com/docs/reference/specification/v2.4.0#operationTraitObject @JsonProperty - Map traits = Collections.EMPTY_MAP; + private Map traits = Collections.EMPTY_MAP; @JsonProperty - Message message; + private Message message; + /** + * @return schema info + */ public SchemaInfo schemaInfo() { if (message.bindings() == null) { throw new IllegalStateException("Bindings not found for operation: " + operationId); diff --git a/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java b/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java index 4d055859..012d7ef5 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java @@ -4,6 +4,9 @@ import lombok.Value; import lombok.experimental.Accessors; +/** + * Pojo representing a schmea + */ @Value @Accessors(fluent = true) public class SchemaInfo { @@ -14,6 +17,20 @@ public class SchemaInfo { private final String contentType; private String schemaLookupStrategy; + /** + * @param schemaRef + * location of schema + * @param schemaFormat + * format of schema + * @param schemaIdLocation + * ??? + * @param schemaIdPayloadLocation + * ??? + * @param contentType + * content type of schema + * @param schemaLookupStrategy + * schema lookup strategy + */ public SchemaInfo(final String schemaRef, final String schemaFormat, final String schemaIdLocation, final String schemaIdPayloadLocation, final String contentType, final String schemaLookupStrategy) { this.schemaRef = schemaRef; diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Tag.java b/parser/src/main/java/io/specmesh/apiparser/model/Tag.java index 2a0352c6..9d886745 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Tag.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Tag.java @@ -8,6 +8,9 @@ import lombok.Value; import lombok.experimental.Accessors; +/** + * Pojo representing a tag + */ @Value @Accessors(fluent = true) @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/settings.gradle.kts b/settings.gradle.kts index c3d01d77..41bb7a6d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -8,4 +8,4 @@ */ rootProject.name = "specmesh-build" -include("parser", "kafka") \ No newline at end of file +include("parser", "kafka", "kafka-test") \ No newline at end of file