Skip to content

Commit

Permalink
Don't blow up if no message is defined (#320)
Browse files Browse the repository at this point in the history
Currently, if a channel is defined with no `message` then the `operation.schemaInfo()` call throws an NPE.  `message` is not required, so a `null` `message should not cause an NPE.

Co-authored-by: Andrew Coates <[email protected]>
  • Loading branch information
big-andy-coates and Andrew Coates authored May 13, 2024
1 parent a4c31b8 commit 83e79ae
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 34 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ channels:
message:
name: Food Item
tags:
- name: "human",
- name: "human"
- name: "purchase"
```
Expand Down Expand Up @@ -223,7 +223,7 @@ Source: com.example.trading-api.yml (spec)
```
Trade.avsc references the _Currency_ .avsc schema (the shared schema type)
```avro schema
```json
{
"metadata": {
"author": "John Doe",
Expand Down Expand Up @@ -298,4 +298,4 @@ channels:
# Developer Notes
1. Install the intellij checkstyle plugin and load the config from config/checkstyle.xml
1. build using: ./gradlew spotlessApply build
1. build using: `./gradlew`
5 changes: 3 additions & 2 deletions kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public Optional<SchemaInfo> ownedTopicSchemas(final String topicName) {
throw new APIException("Unknown topic:" + topicName);
}

return Optional.ofNullable(channel.publish()).map(Operation::schemaInfo);
return Optional.ofNullable(channel.publish()).flatMap(Operation::schemaInfo);
}

/**
Expand All @@ -195,7 +195,8 @@ public Stream<SchemaInfo> topicSchemas(final String topicName) {

return Stream.of(channel.publish(), channel.subscribe())
.filter(Objects::nonNull)
.map(Operation::schemaInfo);
.map(Operation::schemaInfo)
.flatMap(Optional::stream);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,21 @@
class TopicMutatorsTest {

@Mock Admin client;
@Mock KafkaFuture<TopicDescription> descriptionFuture;
@Mock KafkaFuture<Void> createPartitionsFuture;

@Test
void shouldWriteUpdatesForRetentionChange() throws Exception {
final var topicWriter = new UpdateMutator(client);

// Mock hell - crappy admin api
final var topicDescriptionFuture = mock(KafkaFuture.class);
when(topicDescriptionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS))
when(descriptionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS))
.thenReturn(getTopicDescription());

final var describeResult = mock(DescribeTopicsResult.class);

// test value of existing topic
when(describeResult.topicNameValues()).thenReturn(Map.of("test", topicDescriptionFuture));
when(describeResult.topicNameValues()).thenReturn(Map.of("test", descriptionFuture));

when(client.describeTopics(List.of("test"))).thenReturn(describeResult);

Expand All @@ -86,22 +87,20 @@ void shouldWriteUpdatesToPartitionsWhenLarger() throws Exception {
final var topicWriter = new UpdateMutator(client);

// Mock hell - crappy admin api
final var topicDescriptionFuture = mock(KafkaFuture.class);
when(topicDescriptionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS))
when(descriptionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS))
.thenReturn(getTopicDescription());

final var describeResult = mock(DescribeTopicsResult.class);

// test value of existing topic
when(describeResult.topicNameValues()).thenReturn(Map.of("test", topicDescriptionFuture));
when(describeResult.topicNameValues()).thenReturn(Map.of("test", descriptionFuture));

when(client.describeTopics(List.of("test"))).thenReturn(describeResult);

final var createPartitionsResult = mock(CreatePartitionsResult.class);
final var createPartitionFuture = mock(KafkaFuture.class);
when(createPartitionsResult.all()).thenReturn(createPartitionFuture);
when(createPartitionFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS))
.thenReturn(Void.class);
when(createPartitionsResult.all()).thenReturn(createPartitionsFuture);
when(createPartitionsFuture.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS))
.thenReturn(null);
when(client.createPartitions(any(), any())).thenReturn(createPartitionsResult);

// test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.specmesh.apiparser.AsyncApiParser.APIParserException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down Expand Up @@ -66,9 +67,12 @@ public class Operation {
/**
* @return schema info
*/
public SchemaInfo schemaInfo() {
public Optional<SchemaInfo> schemaInfo() {
if (message == null) {
return Optional.empty();
}
try {
return message.schemas();
return Optional.of(message.schemas());
} catch (Exception e) {
throw new APIParserException(
"Error extracting schemas from (publish|subscribe) operation: " + operationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import io.specmesh.apiparser.model.ApiSpec;
Expand All @@ -27,6 +26,7 @@
import io.specmesh.apiparser.model.RecordPart;
import io.specmesh.apiparser.model.RecordPart.KafkaPart;
import io.specmesh.apiparser.model.RecordPart.SchemaRefPart;
import io.specmesh.apiparser.model.SchemaInfo;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;
Expand All @@ -38,11 +38,7 @@ public class AsyncApiSchemaParserTest {
@Test
public void shouldReturnProducerMessageSchema() {
final Map<String, Channel> channelsMap = API_SPEC.channels();
assertThat(channelsMap.entrySet(), hasSize(2));
assertThat(
"Should have assembled 'id + channelname'",
channelsMap.keySet(),
hasItem(".simple.schema-demo._public.user.signed"));
assertThat(channelsMap.keySet(), hasItem(".simple.schema-demo._public.user.signed"));

final Operation op = channelsMap.get(".simple.schema-demo._public.user.signed").publish();

Expand All @@ -57,20 +53,26 @@ public void shouldReturnProducerMessageSchema() {
op.message().payload().schemaRef(),
is(Optional.of("simple_schema_demo_user-signedup.avsc")));

assertThat(op.schemaInfo().schemaFormat(), is(Optional.of(op.message().schemaFormat())));
assertThat(op.schemaInfo().contentType(), is(Optional.of(op.message().contentType())));
assertThat(
op.schemaInfo().schemaIdLocation(),
op.schemaInfo().flatMap(SchemaInfo::schemaFormat),
is(Optional.of(op.message().schemaFormat())));
assertThat(
op.schemaInfo().flatMap(SchemaInfo::contentType),
is(Optional.of(op.message().contentType())));
assertThat(
op.schemaInfo().flatMap(SchemaInfo::schemaIdLocation),
is(Optional.of(op.message().bindings().kafka().schemaIdLocation())));
assertThat(op.schemaInfo().key(), is(op.message().bindings().kafka().key()));
assertThat(op.schemaInfo().value(), is(op.message().payload()));
assertThat(
op.schemaInfo().flatMap(SchemaInfo::key),
is(op.message().bindings().kafka().key()));
assertThat(op.schemaInfo().map(SchemaInfo::value), is(Optional.of(op.message().payload())));
}

@Test
public void shouldReturnSubscriberMessageSchema() {
// Given:
final Map<String, Channel> channelsMap = API_SPEC.channels();

assertThat(channelsMap.keySet(), hasItem("london.hammersmith.transport._public.tube"));
// When:
final Operation op =
channelsMap.get("london.hammersmith.transport._public.tube").subscribe();
Expand All @@ -87,12 +89,30 @@ public void shouldReturnSubscriberMessageSchema() {
op.message().payload().schemaRef(),
is(Optional.of("london_hammersmith_transport_public_passenger.avsc")));

assertThat(op.schemaInfo().schemaFormat(), is(Optional.of(op.message().schemaFormat())));
assertThat(op.schemaInfo().contentType(), is(Optional.of(op.message().contentType())));
assertThat(
op.schemaInfo().schemaIdLocation(),
op.schemaInfo().flatMap(SchemaInfo::schemaFormat),
is(Optional.of(op.message().schemaFormat())));
assertThat(
op.schemaInfo().flatMap(SchemaInfo::contentType),
is(Optional.of(op.message().contentType())));
assertThat(
op.schemaInfo().flatMap(SchemaInfo::schemaIdLocation),
is(Optional.of(op.message().bindings().kafka().schemaIdLocation())));
assertThat(op.schemaInfo().key(), is(op.message().bindings().kafka().key()));
assertThat(op.schemaInfo().value(), is(op.message().payload()));
assertThat(
op.schemaInfo().flatMap(SchemaInfo::key),
is(op.message().bindings().kafka().key()));
assertThat(op.schemaInfo().map(SchemaInfo::value), is(Optional.of(op.message().payload())));
}

@Test
void shouldNotBlowUpIfNotMessageDefined() {
// Given:
final Map<String, Channel> channelsMap = API_SPEC.channels();
assertThat(channelsMap.keySet(), hasItem(".simple.schema-demo._public.user.message-less"));
final Operation op =
channelsMap.get(".simple.schema-demo._public.user.message-less").publish();

// Then:
assertThat(op.schemaInfo(), is(Optional.empty()));
}
}
5 changes: 5 additions & 0 deletions parser/src/test/resources/parser_simple_schema_demo-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ channels:
payload:
$ref: "simple_schema_demo_user-signedup.avsc"

_public.user.message-less:
# channel with no message defined:
publish:
operationId: opId

# SUBSCRIBER WILL REQUEST SCHEMA from SR and CodeGen required classes. Header will be used for Id
london.hammersmith.transport._public.tube:
subscribe:
Expand Down

0 comments on commit 83e79ae

Please sign in to comment.