From 83e79aef0bfc00ad32d64d87843f0d521a5e8a52 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Mon, 13 May 2024 09:34:44 +0100 Subject: [PATCH] Don't blow up if no message is defined (#320) Currently, if a channel is defined with no `message` then the `operation.schemaInfo()` call throws an NPE. `message` is not required, so a `null` `message should not cause an NPE. Co-authored-by: Andrew Coates --- README.md | 6 +-- .../java/io/specmesh/kafka/KafkaApiSpec.java | 5 +- .../kafka/provision/TopicMutatorsTest.java | 19 ++++--- .../specmesh/apiparser/model/Operation.java | 8 ++- .../apiparser/AsyncApiSchemaParserTest.java | 54 +++++++++++++------ .../parser_simple_schema_demo-api.yaml | 5 ++ 6 files changed, 63 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 7c3ab1a2..b87e679e 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ channels: message: name: Food Item tags: - - name: "human", + - name: "human" - name: "purchase" ``` @@ -223,7 +223,7 @@ Source: com.example.trading-api.yml (spec) ``` Trade.avsc references the _Currency_ .avsc schema (the shared schema type) -```avro schema +```json { "metadata": { "author": "John Doe", @@ -298,4 +298,4 @@ channels: # Developer Notes 1. Install the intellij checkstyle plugin and load the config from config/checkstyle.xml -1. build using: ./gradlew spotlessApply build +1. build using: `./gradlew` diff --git a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java index 03a49be6..9654cfed 100644 --- a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java +++ b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java @@ -175,7 +175,7 @@ public Optional ownedTopicSchemas(final String topicName) { throw new APIException("Unknown topic:" + topicName); } - return Optional.ofNullable(channel.publish()).map(Operation::schemaInfo); + return Optional.ofNullable(channel.publish()).flatMap(Operation::schemaInfo); } /** @@ -195,7 +195,8 @@ public Stream topicSchemas(final String topicName) { return Stream.of(channel.publish(), channel.subscribe()) .filter(Objects::nonNull) - .map(Operation::schemaInfo); + .map(Operation::schemaInfo) + .flatMap(Optional::stream); } /** diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/TopicMutatorsTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/TopicMutatorsTest.java index 07f3edc9..b144af1f 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/TopicMutatorsTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/TopicMutatorsTest.java @@ -46,20 +46,21 @@ class TopicMutatorsTest { @Mock Admin client; + @Mock KafkaFuture descriptionFuture; + @Mock KafkaFuture createPartitionsFuture; @Test void shouldWriteUpdatesForRetentionChange() throws Exception { final var topicWriter = new UpdateMutator(client); // Mock hell - crappy admin api - final var topicDescriptionFuture = mock(KafkaFuture.class); - when(topicDescriptionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS)) + when(descriptionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS)) .thenReturn(getTopicDescription()); final var describeResult = mock(DescribeTopicsResult.class); // test value of existing topic - when(describeResult.topicNameValues()).thenReturn(Map.of("test", topicDescriptionFuture)); + when(describeResult.topicNameValues()).thenReturn(Map.of("test", descriptionFuture)); when(client.describeTopics(List.of("test"))).thenReturn(describeResult); @@ -86,22 +87,20 @@ void shouldWriteUpdatesToPartitionsWhenLarger() throws Exception { final var topicWriter = new UpdateMutator(client); // Mock hell - crappy admin api - final var topicDescriptionFuture = mock(KafkaFuture.class); - when(topicDescriptionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS)) + when(descriptionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS)) .thenReturn(getTopicDescription()); final var describeResult = mock(DescribeTopicsResult.class); // test value of existing topic - when(describeResult.topicNameValues()).thenReturn(Map.of("test", topicDescriptionFuture)); + when(describeResult.topicNameValues()).thenReturn(Map.of("test", descriptionFuture)); when(client.describeTopics(List.of("test"))).thenReturn(describeResult); final var createPartitionsResult = mock(CreatePartitionsResult.class); - final var createPartitionFuture = mock(KafkaFuture.class); - when(createPartitionsResult.all()).thenReturn(createPartitionFuture); - when(createPartitionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS)) - .thenReturn(Void.class); + when(createPartitionsResult.all()).thenReturn(createPartitionsFuture); + when(createPartitionsFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS)) + .thenReturn(null); when(client.createPartitions(any(), any())).thenReturn(createPartitionsResult); // test diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java index b39e3176..4edf5fd7 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java @@ -22,6 +22,7 @@ import io.specmesh.apiparser.AsyncApiParser.APIParserException; import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -66,9 +67,12 @@ public class Operation { /** * @return schema info */ - public SchemaInfo schemaInfo() { + public Optional schemaInfo() { + if (message == null) { + return Optional.empty(); + } try { - return message.schemas(); + return Optional.of(message.schemas()); } catch (Exception e) { throw new APIParserException( "Error extracting schemas from (publish|subscribe) operation: " + operationId, diff --git a/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java b/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java index 4b1a1652..9d4c20fe 100644 --- a/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java +++ b/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java @@ -18,7 +18,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import io.specmesh.apiparser.model.ApiSpec; @@ -27,6 +26,7 @@ import io.specmesh.apiparser.model.RecordPart; import io.specmesh.apiparser.model.RecordPart.KafkaPart; import io.specmesh.apiparser.model.RecordPart.SchemaRefPart; +import io.specmesh.apiparser.model.SchemaInfo; import java.util.Map; import java.util.Optional; import org.junit.jupiter.api.Test; @@ -38,11 +38,7 @@ public class AsyncApiSchemaParserTest { @Test public void shouldReturnProducerMessageSchema() { final Map channelsMap = API_SPEC.channels(); - assertThat(channelsMap.entrySet(), hasSize(2)); - assertThat( - "Should have assembled 'id + channelname'", - channelsMap.keySet(), - hasItem(".simple.schema-demo._public.user.signed")); + assertThat(channelsMap.keySet(), hasItem(".simple.schema-demo._public.user.signed")); final Operation op = channelsMap.get(".simple.schema-demo._public.user.signed").publish(); @@ -57,20 +53,26 @@ public void shouldReturnProducerMessageSchema() { op.message().payload().schemaRef(), is(Optional.of("simple_schema_demo_user-signedup.avsc"))); - assertThat(op.schemaInfo().schemaFormat(), is(Optional.of(op.message().schemaFormat()))); - assertThat(op.schemaInfo().contentType(), is(Optional.of(op.message().contentType()))); assertThat( - op.schemaInfo().schemaIdLocation(), + op.schemaInfo().flatMap(SchemaInfo::schemaFormat), + is(Optional.of(op.message().schemaFormat()))); + assertThat( + op.schemaInfo().flatMap(SchemaInfo::contentType), + is(Optional.of(op.message().contentType()))); + assertThat( + op.schemaInfo().flatMap(SchemaInfo::schemaIdLocation), is(Optional.of(op.message().bindings().kafka().schemaIdLocation()))); - assertThat(op.schemaInfo().key(), is(op.message().bindings().kafka().key())); - assertThat(op.schemaInfo().value(), is(op.message().payload())); + assertThat( + op.schemaInfo().flatMap(SchemaInfo::key), + is(op.message().bindings().kafka().key())); + assertThat(op.schemaInfo().map(SchemaInfo::value), is(Optional.of(op.message().payload()))); } @Test public void shouldReturnSubscriberMessageSchema() { // Given: final Map channelsMap = API_SPEC.channels(); - + assertThat(channelsMap.keySet(), hasItem("london.hammersmith.transport._public.tube")); // When: final Operation op = channelsMap.get("london.hammersmith.transport._public.tube").subscribe(); @@ -87,12 +89,30 @@ public void shouldReturnSubscriberMessageSchema() { op.message().payload().schemaRef(), is(Optional.of("london_hammersmith_transport_public_passenger.avsc"))); - assertThat(op.schemaInfo().schemaFormat(), is(Optional.of(op.message().schemaFormat()))); - assertThat(op.schemaInfo().contentType(), is(Optional.of(op.message().contentType()))); assertThat( - op.schemaInfo().schemaIdLocation(), + op.schemaInfo().flatMap(SchemaInfo::schemaFormat), + is(Optional.of(op.message().schemaFormat()))); + assertThat( + op.schemaInfo().flatMap(SchemaInfo::contentType), + is(Optional.of(op.message().contentType()))); + assertThat( + op.schemaInfo().flatMap(SchemaInfo::schemaIdLocation), is(Optional.of(op.message().bindings().kafka().schemaIdLocation()))); - assertThat(op.schemaInfo().key(), is(op.message().bindings().kafka().key())); - assertThat(op.schemaInfo().value(), is(op.message().payload())); + assertThat( + op.schemaInfo().flatMap(SchemaInfo::key), + is(op.message().bindings().kafka().key())); + assertThat(op.schemaInfo().map(SchemaInfo::value), is(Optional.of(op.message().payload()))); + } + + @Test + void shouldNotBlowUpIfNotMessageDefined() { + // Given: + final Map channelsMap = API_SPEC.channels(); + assertThat(channelsMap.keySet(), hasItem(".simple.schema-demo._public.user.message-less")); + final Operation op = + channelsMap.get(".simple.schema-demo._public.user.message-less").publish(); + + // Then: + assertThat(op.schemaInfo(), is(Optional.empty())); } } diff --git a/parser/src/test/resources/parser_simple_schema_demo-api.yaml b/parser/src/test/resources/parser_simple_schema_demo-api.yaml index 6186c1ed..cfcdcbda 100644 --- a/parser/src/test/resources/parser_simple_schema_demo-api.yaml +++ b/parser/src/test/resources/parser_simple_schema_demo-api.yaml @@ -41,6 +41,11 @@ channels: payload: $ref: "simple_schema_demo_user-signedup.avsc" + _public.user.message-less: + # channel with no message defined: + publish: + operationId: opId + # SUBSCRIBER WILL REQUEST SCHEMA from SR and CodeGen required classes. Header will be used for Id london.hammersmith.transport._public.tube: subscribe: