From a4c31b84070ee894a56fa8b3b3ecbe8852b1be87 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Thu, 9 May 2024 16:53:39 +0100 Subject: [PATCH] Support key schemas and types (#319) * fixes: #301 BREAKING CHANGE With Kafka, different parts of the message payload, i.e. the Kafka record, can have type and schema information associated with them. Most people think of the record value, but there is also the key to think about, and in the future the headers. This change seeings support for providing type & schema information for the key of each Kafka record in a topic. This is done by extracting the key information from the channel's `message.bindings.kafka.key` field. For example, the following defines a Kafka topic with a key that contains a complex type: ```yaml message: bindings: kafka: key: $ref: "/schema/key.avsc" payload: $ref: "/schema/value.avsc" ``` The key, or indeed the value, can also be defined as holding one of the inbuilt Kafka types, i.e. types for which most Kafka clients provide serializers out of the box, e.g. ```yaml message: payload: key: type: string value: $ref: "/schema/value.avsc" ``` Support for inline schemas for both key and value remains, however, the provisioning of inline schemas is still unsupported. * Review comments * Simplify API --------- Co-authored-by: Andrew Coates --- README.md | 10 + cli/resources/simple_schema_demo-api.yaml | 6 + cli/src/main/java/io/specmesh/cli/Export.java | 4 +- .../main/java/io/specmesh/cli/Flatten.java | 5 +- .../resources/simple_schema_demo-api.yaml | 6 + .../main/java/io/specmesh/kafka/Exporter.java | 2 +- .../io/specmesh/kafka/ExporterYamlWriter.java | 5 +- .../java/io/specmesh/kafka/KafkaApiSpec.java | 50 +++- .../provision/schema/SchemaProvisioner.java | 95 ++++--- .../kafka/provision/schema/SchemaReaders.java | 44 ++-- .../io/specmesh/kafka/KafkaAPISpecTest.java | 8 +- .../KafkaAPISpecWithGrantAccessAclsTest.java | 7 +- .../ProvisionerFreshStartFunctionalTest.java | 9 +- .../SchemaProvisionerFunctionalTest.java | 97 ++++--- .../SchemaChangeSetCalculatorsTest.java | 5 +- ...datalondon-api-with-grant-access-acls.yaml | 8 + .../src/test/resources/bigdatalondon-api.yaml | 12 + .../clientapi-functional-test-api.yaml | 4 + .../resources/exporter-expected-spec.yaml | 7 +- .../provisioner-functional-test-api.yaml | 6 +- ...rovisioner-update-functional-test-api.yaml | 4 + ...ision_demo._public.user_signed_up.key.avsc | 8 + parser/build.gradle.kts | 1 + .../io/specmesh/apiparser/AsyncApiParser.java | 5 +- .../io/specmesh/apiparser/model/Bindings.java | 2 +- .../apiparser/model/KafkaBinding.java | 16 +- .../io/specmesh/apiparser/model/Message.java | 25 +- .../specmesh/apiparser/model/Operation.java | 24 +- .../specmesh/apiparser/model/RecordPart.java | 211 +++++++++++++++ .../specmesh/apiparser/model/SchemaInfo.java | 30 ++- .../specmesh/apiparser/parse/SpecMapper.java | 35 +++ .../specmesh/apiparser/util/JsonLocation.java | 61 +++++ .../apiparser/AsyncApiSchemaParserTest.java | 63 +++-- .../specmesh/apiparser/model/MessageTest.java | 244 ++++++++++++++++++ .../apiparser/model/RecordPartTest.java | 182 +++++++++++++ .../apiparser/util/JsonLocationTest.java | 91 +++++++ .../parser_simple_schema_demo-api.yaml | 2 + 37 files changed, 1199 insertions(+), 195 deletions(-) create mode 100644 kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc create mode 100644 parser/src/main/java/io/specmesh/apiparser/model/RecordPart.java create mode 100644 parser/src/main/java/io/specmesh/apiparser/parse/SpecMapper.java create mode 100644 parser/src/main/java/io/specmesh/apiparser/util/JsonLocation.java create mode 100644 parser/src/test/java/io/specmesh/apiparser/model/MessageTest.java create mode 100644 parser/src/test/java/io/specmesh/apiparser/model/RecordPartTest.java create mode 100644 parser/src/test/java/io/specmesh/apiparser/util/JsonLocationTest.java diff --git a/README.md b/README.md index 53c2e92e..7c3ab1a2 100644 --- a/README.md +++ b/README.md @@ -43,12 +43,20 @@ channels: publish: message: + bindings: + kafka: + key: + type: long payload: $ref: "/schema/simple.schema_demo._public.user_signed_up.avsc" _private.user_checkout: publish: message: + bindings: + kafka: + key: + $ref: "/schema/simple.schema_demo._public.user_checkout_key.yml" payload: $ref: "/schema/simple.schema_demo._public.user_checkout.yml" @@ -76,6 +84,8 @@ channels: 2.2. Schema published: - /schema/simple.schema_demo._public.user_signed_up.avsc +- /schema/simple.schema_demo._public.user_checkout_key.yml +- /schema/simple.schema_demo._public.user_checkout.yml 2.3. ACLs created: - "name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo._public, patternType=PREFIXED), entry=(principal=User:*, host=*, operation=READ, permissionType=ALLOW))", diff --git a/cli/resources/simple_schema_demo-api.yaml b/cli/resources/simple_schema_demo-api.yaml index 52bc5b0c..f31cb0dc 100644 --- a/cli/resources/simple_schema_demo-api.yaml +++ b/cli/resources/simple_schema_demo-api.yaml @@ -35,6 +35,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" @@ -60,6 +62,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" @@ -87,6 +91,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/cli/src/main/java/io/specmesh/cli/Export.java b/cli/src/main/java/io/specmesh/cli/Export.java index 7e27343f..dff728d1 100644 --- a/cli/src/main/java/io/specmesh/cli/Export.java +++ b/cli/src/main/java/io/specmesh/cli/Export.java @@ -20,10 +20,10 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.specmesh.apiparser.model.ApiSpec; +import io.specmesh.apiparser.parse.SpecMapper; import io.specmesh.kafka.Clients; import io.specmesh.kafka.Exporter; import java.util.Map; @@ -92,7 +92,7 @@ public Integer call() throws Exception { try (Admin adminClient = Clients.adminClient(brokerUrl, username, secret)) { final var apiSpec = Exporter.export(aggid, adminClient); final var mapper = - new ObjectMapper() + SpecMapper.mapper() .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); System.out.println(mapper.writeValueAsString(apiSpec)); diff --git a/cli/src/main/java/io/specmesh/cli/Flatten.java b/cli/src/main/java/io/specmesh/cli/Flatten.java index d8c87093..4c3e3558 100644 --- a/cli/src/main/java/io/specmesh/cli/Flatten.java +++ b/cli/src/main/java/io/specmesh/cli/Flatten.java @@ -20,10 +20,9 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.specmesh.apiparser.parse.SpecMapper; import io.specmesh.kafka.KafkaApiSpec; import java.io.FileOutputStream; import java.nio.charset.StandardCharsets; @@ -73,7 +72,7 @@ public Integer call() throws Exception { final var apiSpec = apiSpec1.apiSpec(); final var mapper = - new ObjectMapper(new YAMLFactory()) + SpecMapper.mapper() .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); final var channels = apiSpec.channels(); diff --git a/cli/src/test/resources/simple_schema_demo-api.yaml b/cli/src/test/resources/simple_schema_demo-api.yaml index 7729b91d..5122e69c 100644 --- a/cli/src/test/resources/simple_schema_demo-api.yaml +++ b/cli/src/test/resources/simple_schema_demo-api.yaml @@ -33,6 +33,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" @@ -58,6 +60,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" @@ -84,6 +88,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/kafka/src/main/java/io/specmesh/kafka/Exporter.java b/kafka/src/main/java/io/specmesh/kafka/Exporter.java index e0d4d175..84d7f2e0 100644 --- a/kafka/src/main/java/io/specmesh/kafka/Exporter.java +++ b/kafka/src/main/java/io/specmesh/kafka/Exporter.java @@ -78,7 +78,7 @@ private static TopicReaders.TopicReader reader( } /** - * Extract the Channel - todo Produce/Consume info + * Extract the Channel * * @param topic - kafka topic config map * @return decorated channel diff --git a/kafka/src/main/java/io/specmesh/kafka/ExporterYamlWriter.java b/kafka/src/main/java/io/specmesh/kafka/ExporterYamlWriter.java index 8dba9b2f..4a2851d4 100644 --- a/kafka/src/main/java/io/specmesh/kafka/ExporterYamlWriter.java +++ b/kafka/src/main/java/io/specmesh/kafka/ExporterYamlWriter.java @@ -17,9 +17,8 @@ package io.specmesh.kafka; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import io.specmesh.apiparser.model.ApiSpec; +import io.specmesh.apiparser.parse.SpecMapper; /** Export the Spec object to its yaml representation */ public class ExporterYamlWriter implements ExporterWriter { @@ -34,7 +33,7 @@ public class ExporterYamlWriter implements ExporterWriter { @Override public String export(final ApiSpec apiSpec) throws Exporter.ExporterException { try { - return new ObjectMapper(new YAMLFactory()).writeValueAsString(apiSpec); + return SpecMapper.mapper().writeValueAsString(apiSpec); } catch (JsonProcessingException e) { throw new Exporter.ExporterException("Failed to convert to YAML", e); } diff --git a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java index 2020b3d3..03a49be6 100644 --- a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java +++ b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java @@ -34,6 +34,8 @@ import io.specmesh.apiparser.AsyncApiParser; import io.specmesh.apiparser.model.ApiSpec; +import io.specmesh.apiparser.model.Channel; +import io.specmesh.apiparser.model.Operation; import io.specmesh.apiparser.model.SchemaInfo; import java.io.FileInputStream; import java.io.InputStream; @@ -42,8 +44,11 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; @@ -151,13 +156,46 @@ public Set requiredAcls() { * @return the schema info. */ public SchemaInfo schemaInfoForTopic(final String topicName) { - final List myTopics = listDomainOwnedTopics(); - myTopics.stream() - .filter(topic -> topic.name().equals(topicName)) - .findFirst() - .orElseThrow(() -> new APIException("Not a domain topic:" + topicName)); + return ownedTopicSchemas(topicName) + .orElseThrow(() -> new APIException("No schema defined for topic: " + topicName)); + } + + /** + * Get info about schemas that are conceptually owned by the supplied {@code topicName}. + * + *

This differs from {@link #topicSchemas} in that it only returned schemas that should be + * registered when provisioning the topic. + * + * @param topicName the name of the topic + * @return stream of the schema info. + */ + public Optional ownedTopicSchemas(final String topicName) { + final Channel channel = apiSpec.channels().get(topicName); + if (channel == null) { + throw new APIException("Unknown topic:" + topicName); + } + + return Optional.ofNullable(channel.publish()).map(Operation::schemaInfo); + } + + /** + * Get schema info for the supplied {@code topicName} + * + *

This differs from {@link #ownedTopicSchemas} in that it returned all known schemas + * associated with the topic. + * + * @param topicName the name of the topic + * @return stream of the schema info. + */ + public Stream topicSchemas(final String topicName) { + final Channel channel = apiSpec.channels().get(topicName); + if (channel == null) { + throw new APIException("Unknown topic:" + topicName); + } - return apiSpec.channels().get(topicName).publish().schemaInfo(); + return Stream.of(channel.publish(), channel.subscribe()) + .filter(Objects::nonNull) + .map(Operation::schemaInfo); } /** diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java index 5843578f..58552657 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java @@ -23,6 +23,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.specmesh.apiparser.model.RecordPart; import io.specmesh.apiparser.model.SchemaInfo; import io.specmesh.kafka.KafkaApiSpec; import io.specmesh.kafka.provision.ExceptionWrapper; @@ -30,10 +31,13 @@ import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.WithState; import io.specmesh.kafka.provision.schema.SchemaReaders.SchemaReader; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -116,39 +120,62 @@ private static SchemaChangeSetCalculators.ChangeSetCalculator calculator( private static List requiredSchemas( final KafkaApiSpec apiSpec, final String baseResourcePath) { return apiSpec.listDomainOwnedTopics().stream() - .filter( - topic -> - apiSpec.apiSpec() - .channels() - .get(topic.name()) - .publish() - .isSchemaRequired()) - .map( - (topic -> { - final var schema = Schema.builder(); - try { - final var schemaInfo = apiSpec.schemaInfoForTopic(topic.name()); - final var schemaRef = schemaInfo.schemaRef(); - final var schemaPath = Paths.get(baseResourcePath, schemaRef); - final var schemas = - new SchemaReaders.FileSystemSchemaReader() - .readLocal(schemaPath.toString()); - schema.schemas(schemas) - .type(schemaInfo.schemaRef()) - .subject( - resolveSubjectName( - topic.name(), schemas, schemaInfo)); - schema.state(CREATE); - - } catch (Provisioner.ProvisioningException ex) { - schema.state(FAILED); - schema.exception(ex); - } - return schema.build(); - })) + .flatMap(topic -> topicSchemas(apiSpec, baseResourcePath, topic.name())) .collect(Collectors.toList()); } + private static Stream topicSchemas( + final KafkaApiSpec apiSpec, final String baseResourcePath, final String topicName) { + return apiSpec.ownedTopicSchemas(topicName).stream() + .flatMap( + si -> + Stream.of( + si.key() + .flatMap(RecordPart::schemaRef) + .map( + schemaRef -> + partSchema( + "key", + schemaRef, + si, + baseResourcePath, + topicName)), + si.value() + .schemaRef() + .map( + schemaRef -> + partSchema( + "value", + schemaRef, + si, + baseResourcePath, + topicName)))) + .flatMap(Optional::stream); + } + + private static Schema partSchema( + final String partName, + final String schemaRef, + final SchemaInfo si, + final String baseResourcePath, + final String topicName) { + final Schema.SchemaBuilder builder = Schema.builder(); + try { + final Path schemaPath = Paths.get(baseResourcePath, schemaRef); + final Collection schemas = + new SchemaReaders.FileSystemSchemaReader().readLocal(schemaPath); + + builder.schemas(schemas) + .type(schemaRef) + .subject(resolveSubjectName(topicName, schemas, si, partName)); + builder.state(CREATE); + } catch (Provisioner.ProvisioningException ex) { + builder.state(FAILED); + builder.exception(ex); + } + return builder.build(); + } + /** * Follow these guidelines for Confluent SR and APICurio * https://docs.confluent.io/platform/6.2/schema-registry/serdes-develop/index.html#referenced-schemas @@ -175,8 +202,10 @@ private static List requiredSchemas( private static String resolveSubjectName( final String topicName, final Collection schemas, - final SchemaInfo schemaInfo) { - final var lookup = schemaInfo.schemaLookupStrategy(); + final SchemaInfo schemaInfo, + final String partName) { + final String lookup = schemaInfo.schemaLookupStrategy().orElse(""); + if (lookup.equalsIgnoreCase("SimpleTopicIdStrategy")) { return topicName; } @@ -202,7 +231,7 @@ private static String resolveSubjectName( return topicName + "-" + ((AvroSchema) next).rawSchema().getFullName(); } - return topicName + "-value"; + return topicName + "-" + partName; } private static boolean isAvro(final ParsedSchema schema) { diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java index a5fee1f2..ab811ab6 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java @@ -31,17 +31,17 @@ import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; import io.specmesh.kafka.provision.schema.SchemaProvisioner.SchemaProvisioningException; -import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import lombok.Data; import lombok.experimental.Accessors; @@ -59,38 +59,32 @@ private SchemaReaders() {} public static final class FileSystemSchemaReader { - public Collection readLocal(final String filePath) { + public Collection readLocal(final Path filePath) { try { - final var schemaContent = Files.readString(Paths.get(filePath)); + final var schemaContent = Files.readString(filePath); final var results = new ArrayList(); - if (filePath.endsWith(".avsc")) { + final String filename = + Optional.ofNullable(filePath.getFileName()) + .map(Objects::toString) + .orElse(""); + if (filename.endsWith(".avsc")) { final var refs = resolveReferencesFor(filePath, schemaContent); results.add( new AvroSchema( schemaContent, refs.references, refs.resolvedReferences, -1)); - } - if (filePath.endsWith(".yml")) { + } else if (filename.endsWith(".yml")) { results.add(new JsonSchema(schemaContent)); - } - if (filePath.endsWith(".proto")) { + } else if (filename.endsWith(".proto")) { results.add(new ProtobufSchema(schemaContent)); + } else { + throw new UnsupportedOperationException("Unsupported schema file: " + filePath); } return results; - } catch (Throwable ex) { - try { - throw new SchemaProvisioningException( - "Failed to load: " - + filePath - + " from: " - + new File(".").getCanonicalFile().getAbsolutePath(), - ex); - } catch (IOException e) { - throw new RuntimeException( - "Failed to read canonical path for file system:" - + new File(".").getAbsolutePath()); - } + } catch (Exception ex) { + throw new SchemaProvisioningException( + "Failed to load: " + filePath + " from: " + filePath.toAbsolutePath(), ex); } } @@ -102,11 +96,11 @@ public Collection readLocal(final String filePath) { * @return */ private SchemaReferences resolveReferencesFor( - final String filePath, final String schemaContent) { + final Path filePath, final String schemaContent) { try { final SchemaReferences results = new SchemaReferences(); final var refs = findJsonNodes(objectMapper.readTree(schemaContent), "subject"); - final var parent = new File(filePath).getParent(); + final var parent = filePath.toFile().getParent(); refs.forEach(ref -> results.add(parent, ref)); return results; } catch (JsonProcessingException e) { @@ -203,7 +197,7 @@ private List resolvePayload(final String type, final String conten private ParsedSchema parsedSchema(final String type, final String payload) { if (type.endsWith(".avsc") || type.equals("AVRO")) { - return new AvroSchema(payload); + return new AvroSchema(payload, List.of(), Map.of(), -1); } if (type.endsWith(".yml") || type.equals("JSON")) { return new JsonSchema(payload); diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java index 70be8391..853ffd47 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java @@ -26,6 +26,7 @@ import io.specmesh.test.TestSpecLoader; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.NewTopic; @@ -190,8 +191,9 @@ void shouldNotHaveAnyAdditionalAcls() { @Test public void shouldGetSchemaInfoForOwnedTopics() { - final List newTopics = API_SPEC.listDomainOwnedTopics(); - final SchemaInfo schemaInfo = API_SPEC.schemaInfoForTopic(newTopics.get(0).name()); - assertThat(schemaInfo.schemaIdLocation(), is("header")); + final Optional schema = + API_SPEC.ownedTopicSchemas( + "london.hammersmith.olympia.bigdatalondon._public.attendee"); + assertThat(schema.flatMap(SchemaInfo::schemaIdLocation), is(Optional.of("header"))); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java index fa61c399..7686fbfc 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.NewTopic; @@ -199,8 +200,8 @@ void shouldNotHaveAnyAdditionalAcls() { @Test public void shouldGetSchemaInfoForOwnedTopics() { - final List newTopics = API_SPEC.listDomainOwnedTopics(); - final SchemaInfo schemaInfo = API_SPEC.schemaInfoForTopic(newTopics.get(0).name()); - assertThat(schemaInfo.schemaIdLocation(), is("header")); + final Optional schema = + API_SPEC.ownedTopicSchemas("london.hammersmith.olympia.bigdatalondon.attendee"); + assertThat(schema.flatMap(SchemaInfo::schemaIdLocation), is(Optional.of("header"))); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java index b376391c..01e3eb77 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java @@ -197,7 +197,7 @@ void shouldDryRunSchemasFromEmptyCluster() throws RestClientException, IOExcepti assertThat( "dry run should leave changeset in 'create' state", changeset.stream().filter(topic -> topic.state() == Status.STATE.CREATE).count(), - is(2L)); + is(3L)); // Verify - should have 2 SR entries final var allSubjects = srClient.getAllSubjects(); @@ -341,12 +341,12 @@ void shouldPublishSchemasFromEmptyCluster() throws RestClientException, IOExcept // Verify - 11 created assertThat( changeset.stream().filter(topic -> topic.state() == Status.STATE.CREATED).count(), - is(2L)); + is(3L)); - // Verify - should have 2 SR entries + // Verify - should have 3 SR entries final var allSubjects = srClient.getAllSubjects(); - assertThat(allSubjects, is(hasSize(2))); + assertThat(allSubjects, is(hasSize(3))); final var schemas = srClient.getSchemas("simple", false, false); @@ -358,6 +358,7 @@ void shouldPublishSchemasFromEmptyCluster() throws RestClientException, IOExcept is( containsInAnyOrder( "io.specmesh.kafka.schema.UserInfo", + "simple.provision_demo._public.user_signed_up_value.key.UserSignedUpKey", "simple.provision_demo._public.user_signed_up_value.UserSignedUp"))); } diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java index 9314d001..8e735566 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java @@ -36,8 +36,11 @@ import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; import io.specmesh.test.TestSpecLoader; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -59,6 +62,7 @@ class SchemaProvisionerFunctionalTest { private static final KafkaApiSpec API_UPDATE_SPEC = TestSpecLoader.loadFromClassPath("provisioner-update-functional-test-api.yaml"); + private SchemaRegistryClient srClient; private enum Domain { /** The domain associated with the spec. */ @@ -87,20 +91,47 @@ private enum Domain { Domain.SELF.domainId + "-secret") .build(); + @BeforeEach + void setUp() { + srClient = KAFKA_ENV.srClient(); + } + + @AfterEach + void tearDown() throws Exception { + srClient.close(); + } + /** setup for following update tests */ @Test @Order(1) - void shouldProvisionExistingSpec() { - SchemaProvisioner.provision( - false, false, API_SPEC, "./build/resources/test", KAFKA_ENV.srClient()); + void shouldProvisionExistingSpec() throws Exception { + // When: + final Collection provisioned = + SchemaProvisioner.provision( + false, false, API_SPEC, "./build/resources/test", srClient); + + // Then: should have provisioned 3 schema: + assertThat( + provisioned.stream().filter(topic -> topic.state() == STATE.CREATED).count(), + is(3L)); + + assertThat( + srClient.getAllSubjects(), + containsInAnyOrder( + "simple.provision_demo._public.user_signed_up-key", + "simple.provision_demo._public.user_signed_up-value", + "simple.provision_demo._protected.user_info-value")); + + assertThat( + srClient.getAllSubjectsByPrefix(API_SPEC.id()), + hasSize(srClient.getAllSubjects().size())); } @Test @Order(2) - void shouldPublishUpdatedSchemas() throws RestClientException, IOException { + void shouldPublishUpdatedSchemas() throws Exception { - final var srClient = KAFKA_ENV.srClient(); - final var dryRunChangeset = + final Collection dryRunChangeset = SchemaProvisioner.provision( true, false, API_UPDATE_SPEC, "./build/resources/test", srClient); @@ -109,12 +140,15 @@ void shouldPublishUpdatedSchemas() throws RestClientException, IOException { dryRunChangeset.stream().filter(topic -> topic.state() == STATE.UPDATE).count(), is(1L)); - // Verify - should have 2 SR entries (1 was updated, 1 was from original spec) - final var allSubjects = srClient.getAllSubjects(); - - assertThat(allSubjects, is(hasSize(2))); + // Verify - should have 3 SR entries (1 was updated, 2 was from original spec) + assertThat( + srClient.getAllSubjects(), + containsInAnyOrder( + "simple.provision_demo._public.user_signed_up-key", + "simple.provision_demo._public.user_signed_up-value", + "simple.provision_demo._protected.user_info-value")); - final var updateChangeset = + final Collection updateChangeset = SchemaProvisioner.provision( false, false, API_UPDATE_SPEC, "./build/resources/test", srClient); @@ -136,6 +170,7 @@ void shouldPublishUpdatedSchemas() throws RestClientException, IOException { is( containsInAnyOrder( "io.specmesh.kafka.schema.UserInfo", + "simple.provision_demo._public.user_signed_up_value.key.UserSignedUpKey", "simple.provision_demo._public.user_signed_up_value.UserSignedUp"))); } @@ -162,45 +197,35 @@ void shouldRemoveUnspecdSchemas() throws RestClientException, IOException { .state(STATE.READ) .build(); - try (SchemaRegistryClient srClient = KAFKA_ENV.srClient()) { + // insert the bad schema + srClient.register(subject, schema.getSchema()); - // insert the bad schema - srClient.register(subject, schema.getSchema()); - - testDryRun(subject, srClient); - testCleanUnSpecSchemas(srClient); - } + testDryRun(subject); + testCleanUnSpecSchemas(); } - private static void testCleanUnSpecSchemas(final SchemaRegistryClient srClient) - throws IOException, RestClientException { + private void testCleanUnSpecSchemas() throws IOException, RestClientException { final var cleanerSet2 = SchemaProvisioner.provision( - false, - true, - API_UPDATE_SPEC, - "./build/resources/test", - KAFKA_ENV.srClient()); + false, true, API_UPDATE_SPEC, "./build/resources/test", srClient); // verify it was removed assertThat(cleanerSet2.iterator().next().state(), is(STATE.DELETED)); - final var allSchemasforSpec = srClient.getAllSubjectsByPrefix(API_SPEC.id()); - // verify removal - assertThat(allSchemasforSpec, is(hasSize(2))); + assertThat( + srClient.getAllSubjectsByPrefix(API_SPEC.id()), + containsInAnyOrder( + "simple.provision_demo._public.user_signed_up-key", + "simple.provision_demo._public.user_signed_up-value", + "simple.provision_demo._protected.user_info-value")); } - private static void testDryRun(final String subject, final SchemaRegistryClient srClient) - throws IOException, RestClientException { + private void testDryRun(final String subject) throws IOException, RestClientException { // test dry run final var cleanerSet = SchemaProvisioner.provision( - true, - true, - API_UPDATE_SPEC, - "./build/resources/test", - KAFKA_ENV.srClient()); + true, true, API_UPDATE_SPEC, "./build/resources/test", srClient); // verify dry run assertThat(cleanerSet, is(hasSize(1))); @@ -211,6 +236,6 @@ private static void testDryRun(final String subject, final SchemaRegistryClient assertThat(cleanerSet.iterator().next().state(), is(STATE.DELETE)); final var allSchemasforId = srClient.getAllSubjectsByPrefix(API_SPEC.id()); - assertThat(allSchemasforId, is(hasSize(3))); + assertThat(allSchemasforId, is(hasSize(4))); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java index eb8297e3..a178028c 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java @@ -28,6 +28,7 @@ import io.specmesh.kafka.provision.Provisioner; import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; +import java.nio.file.Path; import java.util.List; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -82,8 +83,8 @@ void shouldOutputMessagesOnBorkedSchema() throws Exception { assertThat(schemas.iterator().next().messages(), is(containsString("borked"))); } - private static String filename(final String extra) { - return "./build/resources/test/schema/" + SCHEMA_FILENAME + extra; + private static Path filename(final String extra) { + return Path.of("./build/resources/test/schema/" + SCHEMA_FILENAME + extra); } static ParsedSchema getSchema(final String schemaRefType, final String content) { diff --git a/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml b/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml index 0c0a153c..011c7f0a 100644 --- a/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml +++ b/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml @@ -71,6 +71,10 @@ channels: tags: [ name: "grant-access:some.other.domain.root" ] + bindings: + kafka: + key: + type: long message: name: Food Item tags: [ @@ -107,6 +111,10 @@ channels: name: "human", name: "customer" ] + bindings: + kafka: + key: + type: long payload: type: object properties: diff --git a/kafka/src/test/resources/bigdatalondon-api.yaml b/kafka/src/test/resources/bigdatalondon-api.yaml index e3000bfa..fb5d44d1 100644 --- a/kafka/src/test/resources/bigdatalondon-api.yaml +++ b/kafka/src/test/resources/bigdatalondon-api.yaml @@ -68,6 +68,10 @@ channels: ] message: name: Food Item + bindings: + kafka: + key: + type: long tags: [ name: "human", name: "purchase" @@ -97,6 +101,10 @@ channels: summary: Humans customers message: name: Food Item + bindings: + kafka: + key: + type: long tags: [ name: "human", name: "customer" @@ -123,6 +131,10 @@ channels: summary: Humans arriving in the borough message: name: Human + bindings: + kafka: + key: + type: long payload: type: object properties: diff --git a/kafka/src/test/resources/clientapi-functional-test-api.yaml b/kafka/src/test/resources/clientapi-functional-test-api.yaml index 699fa43c..0c826d78 100644 --- a/kafka/src/test/resources/clientapi-functional-test-api.yaml +++ b/kafka/src/test/resources/clientapi-functional-test-api.yaml @@ -35,6 +35,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" @@ -62,6 +64,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/kafka/src/test/resources/exporter-expected-spec.yaml b/kafka/src/test/resources/exporter-expected-spec.yaml index f3f15f23..49ad1488 100644 --- a/kafka/src/test/resources/exporter-expected-spec.yaml +++ b/kafka/src/test/resources/exporter-expected-spec.yaml @@ -1,19 +1,14 @@ --- id: "urn:asyncapi-id" version: "version-123" -asyncapi: null channels: one-topic-channel: description: "one-topic-channel-description" bindings: kafka: - envs: [] partitions: 1 replicas: 3 - configs: {} groupId: "kafka-binding-group-id" schemaIdLocation: "payload" schemaLookupStrategy: "TopicNameStrategy" - bindingVersion: null - publish: null - subscribe: null + bindingVersion: "latest" diff --git a/kafka/src/test/resources/provisioner-functional-test-api.yaml b/kafka/src/test/resources/provisioner-functional-test-api.yaml index 12752941..ee2ef44e 100644 --- a/kafka/src/test/resources/provisioner-functional-test-api.yaml +++ b/kafka/src/test/resources/provisioner-functional-test-api.yaml @@ -28,19 +28,19 @@ channels: cleanup.policy: delete retention.ms: 3600000 - publish: summary: Inform about signup operationId: onUserSignedUp message: bindings: kafka: + key: + $ref: schema/simple.provision_demo._public.user_signed_up.key.avsc schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" payload: $ref: "/schema/simple.provision_demo._public.user_signed_up.avsc" - _protected.user_info: bindings: kafka: @@ -62,6 +62,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/kafka/src/test/resources/provisioner-update-functional-test-api.yaml b/kafka/src/test/resources/provisioner-update-functional-test-api.yaml index 7e77b499..2652e560 100644 --- a/kafka/src/test/resources/provisioner-update-functional-test-api.yaml +++ b/kafka/src/test/resources/provisioner-update-functional-test-api.yaml @@ -34,6 +34,8 @@ channels: message: bindings: kafka: + key: + $ref: schema/simple.provision_demo._public.user_signed_up.key.avsc schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" @@ -61,6 +63,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc new file mode 100644 index 00000000..ffdb8351 --- /dev/null +++ b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc @@ -0,0 +1,8 @@ +{ + "type": "record", + "namespace": "simple.provision_demo._public.user_signed_up_value.key", + "name": "UserSignedUpKey", + "fields": [ + {"name": "id", "type": "int"} + ] +} \ No newline at end of file diff --git a/parser/build.gradle.kts b/parser/build.gradle.kts index 8e04f2a9..c3fee22c 100644 --- a/parser/build.gradle.kts +++ b/parser/build.gradle.kts @@ -27,6 +27,7 @@ dependencies { api("com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion") implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion") + implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion") compileOnly("org.projectlombok:lombok:$lombokVersion") annotationProcessor("org.projectlombok:lombok:$lombokVersion") diff --git a/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java b/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java index 2e4a97ce..33951de6 100644 --- a/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java +++ b/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java @@ -16,9 +16,8 @@ package io.specmesh.apiparser; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import io.specmesh.apiparser.model.ApiSpec; +import io.specmesh.apiparser.parse.SpecMapper; import java.io.IOException; import java.io.InputStream; @@ -36,7 +35,7 @@ public final ApiSpec loadResource(final InputStream inputStream) throws IOExcept if (inputStream == null || inputStream.available() == 0) { throw new RuntimeException("Not found"); } - return new ObjectMapper(new YAMLFactory()).readValue(inputStream, ApiSpec.class); + return SpecMapper.mapper().readValue(inputStream, ApiSpec.class); } public static class APIParserException extends RuntimeException { diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java b/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java index b6b40484..0d15a41b 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java @@ -34,7 +34,7 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class Bindings { - @JsonProperty KafkaBinding kafka; + @JsonProperty private KafkaBinding kafka; public void validate() { kafka.validate(); diff --git a/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java b/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java index 4a22c55c..15d4158c 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java @@ -19,10 +19,9 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -38,16 +37,15 @@ */ /** Pojo representing a Kafka binding */ +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") @Builder @Data @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) @Accessors(fluent = true) @JsonIgnoreProperties(ignoreUnknown = true) -@SuppressWarnings({"unchecked"}) @SuppressFBWarnings public class KafkaBinding { - private static final int DAYS_TO_MS = 24 * 60 * 60 * 1000; @Builder.Default @JsonProperty private List envs = List.of(); @@ -57,19 +55,23 @@ public class KafkaBinding { @Builder.Default @JsonProperty private Map configs = Map.of(); - @JsonProperty private String groupId; + @Builder.Default @JsonProperty private String groupId = ""; @Builder.Default @JsonProperty private String schemaIdLocation = "payload"; + @Builder.Default @JsonProperty private String schemaIdPayloadEncoding = ""; + @Builder.Default @JsonProperty private String schemaLookupStrategy = "TopicNameStrategy"; - @JsonProperty private String bindingVersion; + @Builder.Default @JsonProperty private String bindingVersion = "latest"; + + @Builder.Default @JsonProperty private Optional key = Optional.empty(); /** * @return configs */ public Map configs() { - return new LinkedHashMap<>(configs == null ? Collections.emptyMap() : configs); + return Map.copyOf(configs); } /** diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Message.java b/parser/src/main/java/io/specmesh/apiparser/model/Message.java index 1c1a5136..e094fa0a 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Message.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Message.java @@ -19,8 +19,10 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.specmesh.apiparser.AsyncApiParser.APIParserException; import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.Value; import lombok.experimental.Accessors; @@ -40,7 +42,7 @@ public final class Message { @JsonProperty private Map headers; - @JsonProperty private Map payload; + @JsonProperty private RecordPart payload; @JsonProperty private Map correlationId; @@ -70,7 +72,7 @@ private Message() { this.tags = List.of(); this.traits = Map.of(); this.messageId = null; - this.payload = Map.of(); + this.payload = null; this.name = null; this.title = null; this.summary = null; @@ -79,9 +81,22 @@ private Message() { } /** - * @return the location of the schema + * @return The schemas in use */ - public String schemaRef() { - return (String) payload.get("$ref"); + public SchemaInfo schemas() { + if (payload == null) { + throw new APIParserException("missing 'message.payload' definition"); + } + + final Optional kafkaBinding = + Optional.ofNullable(bindings).map(Bindings::kafka); + + return new SchemaInfo( + kafkaBinding.flatMap(KafkaBinding::key), + payload, + Optional.ofNullable(schemaFormat), + kafkaBinding.map(KafkaBinding::schemaIdLocation), + Optional.ofNullable(contentType), + kafkaBinding.map(KafkaBinding::schemaLookupStrategy)); } } diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java index 608e88c2..b39e3176 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java @@ -54,10 +54,7 @@ public class Operation { @JsonProperty private String description; - @SuppressWarnings("rawtypes") - @JsonProperty - @Builder.Default - private List tags = List.of(); + @JsonProperty @Builder.Default private List tags = List.of(); @JsonProperty private Bindings bindings; @@ -70,16 +67,13 @@ public class Operation { * @return schema info */ public SchemaInfo schemaInfo() { - if (message.bindings() == null) { + try { + return message.schemas(); + } catch (Exception e) { throw new APIParserException( - "Bindings not found for (publish|subscribe) operation: " + operationId); + "Error extracting schemas from (publish|subscribe) operation: " + operationId, + e); } - return new SchemaInfo( - message().schemaRef(), - message().schemaFormat(), - message.bindings().kafka().schemaIdLocation(), - message().contentType(), - message.bindings().kafka().schemaLookupStrategy()); } public void validate() { @@ -87,10 +81,4 @@ public void validate() { throw new APIParserException("(publish|subscribe) operationId is null"); } } - - public boolean isSchemaRequired() { - return this.message() != null - && this.message().schemaRef() != null - && this.message().schemaRef().length() > 0; - } } diff --git a/parser/src/main/java/io/specmesh/apiparser/model/RecordPart.java b/parser/src/main/java/io/specmesh/apiparser/model/RecordPart.java new file mode 100644 index 00000000..b7513d2a --- /dev/null +++ b/parser/src/main/java/io/specmesh/apiparser/model/RecordPart.java @@ -0,0 +1,211 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.model; + +import static java.util.Objects.requireNonNull; + +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import io.specmesh.apiparser.AsyncApiParser; +import io.specmesh.apiparser.util.JsonLocation; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.Value; + +/** + * Metadata describing a part of a Kafka record. + * + *

e.g. the key or value of a record. + */ +@JsonDeserialize(using = RecordPart.Deserializer.class) +public interface RecordPart { + /** + * @return reference to an external schema file. + */ + default Optional schemaRef() { + return Optional.empty(); + } + + default Optional kafkaType() { + return Optional.empty(); + } + + // Types supported by for standard Kafka serializers: + enum KafkaType { + UUID("uuid"), + Long("long"), + Int("int"), + Short("short"), + Float("float"), + Double("double"), + String("string"), + Bytes("bytes"), + Void("void"); + + private static final String VALID_VALUES = + Arrays.stream(values()) + .map(RecordPart.KafkaType::toString) + .collect(Collectors.joining(", ", "[", "]")); + + private final String text; + + KafkaType(final String text) { + this.text = requireNonNull(text, "text"); + } + + @JsonValue + @Override + public String toString() { + return text; + } + + public static KafkaPart.KafkaType fromText(final String text) { + return Arrays.stream(values()) + .filter(t -> t.text.equals(text)) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException( + "Unknown KafkaType: " + + text + + ". Valid values are: " + + VALID_VALUES)); + } + } + + @Value + final class KafkaPart implements RecordPart { + + private final KafkaPart.KafkaType kafkaType; + + public KafkaPart(final KafkaPart.KafkaType kafkaType) { + this.kafkaType = requireNonNull(kafkaType, "kafkaType"); + } + + @Override + public Optional kafkaType() { + return Optional.of(kafkaType); + } + } + + @Value + final class SchemaRefPart implements RecordPart { + + private final String schemaRef; + + public SchemaRefPart(final String schemaRef) { + this.schemaRef = requireNonNull(schemaRef, "schemaRef"); + } + + public Optional schemaRef() { + return Optional.of(schemaRef); + } + } + + @Value + final class OtherPart implements RecordPart { + + private final JsonNode node; + + public OtherPart(final JsonNode node) { + this.node = requireNonNull(node, "node"); + } + } + + final class Deserializer extends JsonDeserializer { + + private static final String REF_FIELD = "$ref"; + private static final String TYPE_FIELD = "type"; + + @Override + public RecordPart deserialize(final JsonParser parser, final DeserializationContext ctx) + throws IOException { + final URI location = JsonLocation.location(parser); + + if (parser.currentToken() != JsonToken.START_OBJECT) { + throw new AsyncApiParser.APIParserException( + "Record part should be an object with either " + + REF_FIELD + + " or " + + TYPE_FIELD + + ". location: " + + location); + } + + parser.nextToken(); + + Optional ref = Optional.empty(); + Optional type = Optional.empty(); + Optional props = Optional.empty(); + + while (parser.currentToken() != JsonToken.END_OBJECT) { + final String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case REF_FIELD: + ref = Optional.of(ctx.readValue(parser, String.class)); + break; + case TYPE_FIELD: + type = Optional.of(ctx.readValue(parser, String.class)); + break; + case "properties": + props = Optional.of(ctx.readTree(parser)); + break; + default: + ctx.readTree(parser); + break; + } + + parser.nextToken(); + } + + if (ref.isPresent() == type.isPresent()) { + throw new AsyncApiParser.APIParserException( + "Record part requires either " + + REF_FIELD + + " or " + + TYPE_FIELD + + ", but not both. location: " + + location); + } + + if (ref.isPresent()) { + return new SchemaRefPart(ref.get()); + } + + if (!"object".equals(type.get())) { + return new KafkaPart(KafkaType.fromText(type.get())); + } + + return new OtherPart( + props.orElseThrow( + () -> + new AsyncApiParser.APIParserException( + "'object' types requires inline 'properties. location: " + + location))); + } + } +} diff --git a/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java b/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java index d2512d5c..58293a94 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java @@ -18,33 +18,39 @@ import static java.util.Objects.requireNonNull; +import java.util.Optional; import lombok.Value; import lombok.experimental.Accessors; /** Pojo representing a schmea */ +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") @Value @Accessors(fluent = true) public class SchemaInfo { - private final String schemaRef; - private final String schemaFormat; - private final String schemaIdLocation; - private final String contentType; - private String schemaLookupStrategy; + private final Optional key; + private final RecordPart value; + private final Optional schemaFormat; + private final Optional schemaIdLocation; + private final Optional contentType; + private final Optional schemaLookupStrategy; /** - * @param schemaRef location of schema + * @param key schema info for the key of the Kafka Record + * @param value schema info for the value of the Kafka Record * @param schemaFormat format of schema * @param schemaIdLocation header || payload * @param contentType content type of schema * @param schemaLookupStrategy schema lookup strategy */ public SchemaInfo( - final String schemaRef, - final String schemaFormat, - final String schemaIdLocation, - final String contentType, - final String schemaLookupStrategy) { - this.schemaRef = schemaRef; + final Optional key, + final RecordPart value, + final Optional schemaFormat, + final Optional schemaIdLocation, + final Optional contentType, + final Optional schemaLookupStrategy) { + this.key = requireNonNull(key, "key"); + this.value = requireNonNull(value, "value"); this.schemaFormat = requireNonNull(schemaFormat, "schemaFormat"); this.schemaIdLocation = requireNonNull(schemaIdLocation, "schemaIdLocation"); this.contentType = requireNonNull(contentType, "contentType"); diff --git a/parser/src/main/java/io/specmesh/apiparser/parse/SpecMapper.java b/parser/src/main/java/io/specmesh/apiparser/parse/SpecMapper.java new file mode 100644 index 00000000..9afb6441 --- /dev/null +++ b/parser/src/main/java/io/specmesh/apiparser/parse/SpecMapper.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.parse; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; + +/** ObjectMapper for use (de)serializing openapi / specmesh specs. */ +public final class SpecMapper { + + private SpecMapper() {} + + public static JsonMapper mapper() { + return JsonMapper.builder(new YAMLFactory()) + .addModule(new Jdk8Module()) + .serializationInclusion(JsonInclude.Include.NON_EMPTY) + .build(); + } +} diff --git a/parser/src/main/java/io/specmesh/apiparser/util/JsonLocation.java b/parser/src/main/java/io/specmesh/apiparser/util/JsonLocation.java new file mode 100644 index 00000000..fa2a55cf --- /dev/null +++ b/parser/src/main/java/io/specmesh/apiparser/util/JsonLocation.java @@ -0,0 +1,61 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.util; + +import com.fasterxml.jackson.core.JsonParser; +import java.io.File; +import java.net.URI; + +/** + * Util class for determining the current location within a JSON/YAML file being parsed. + * + *

Locations are in the URI form: {@code file:///path/to/file.yml:}. + */ +public final class JsonLocation { + + private static final URI UNKNOWN = URI.create("unknown"); + + private JsonLocation() {} + + /** + * Get the current location from the parser. + * + * @param parser the parser. + * @return the current location. + */ + public static URI location(final JsonParser parser) { + return location(parser.currentLocation()); + } + + /** + * Get the location from the Jackson location + * + * @param location the Jackson location + * @return the location. + */ + public static URI location(final com.fasterxml.jackson.core.JsonLocation location) { + final Object content = location.contentReference().getRawContent(); + if (!(content instanceof File)) { + return UNKNOWN; + } + + final String filePath = + ((File) content).toURI().toString().replaceFirst("file:/", "file:///"); + + return URI.create(filePath + ":" + location.getLineNr()); + } +} diff --git a/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java b/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java index e2b709a5..4b1a1652 100644 --- a/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java +++ b/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java @@ -24,7 +24,11 @@ import io.specmesh.apiparser.model.ApiSpec; import io.specmesh.apiparser.model.Channel; import io.specmesh.apiparser.model.Operation; +import io.specmesh.apiparser.model.RecordPart; +import io.specmesh.apiparser.model.RecordPart.KafkaPart; +import io.specmesh.apiparser.model.RecordPart.SchemaRefPart; import java.util.Map; +import java.util.Optional; import org.junit.jupiter.api.Test; public class AsyncApiSchemaParserTest { @@ -40,36 +44,55 @@ public void shouldReturnProducerMessageSchema() { channelsMap.keySet(), hasItem(".simple.schema-demo._public.user.signed")); - final Operation publish = - channelsMap.get(".simple.schema-demo._public.user.signed").publish(); - assertThat(publish.message().schemaRef(), is("simple_schema_demo_user-signedup.avsc")); + final Operation op = channelsMap.get(".simple.schema-demo._public.user.signed").publish(); assertThat( - publish.message().schemaFormat(), - is("application/vnd.apache.avro+json;version=1.9.0")); - assertThat(publish.message().contentType(), is("application/octet-stream")); - assertThat(publish.message().bindings().kafka().schemaIdLocation(), is("payload")); - assertThat(publish.schemaInfo().schemaIdLocation(), is("payload")); + op.message().schemaFormat(), is("application/vnd.apache.avro+json;version=1.9.0")); + assertThat(op.message().contentType(), is("application/octet-stream")); + assertThat(op.message().bindings().kafka().schemaIdLocation(), is("payload")); + assertThat( + op.message().bindings().kafka().key(), + is(Optional.of(new SchemaRefPart("key.avsc")))); + assertThat( + op.message().payload().schemaRef(), + is(Optional.of("simple_schema_demo_user-signedup.avsc"))); + + assertThat(op.schemaInfo().schemaFormat(), is(Optional.of(op.message().schemaFormat()))); + assertThat(op.schemaInfo().contentType(), is(Optional.of(op.message().contentType()))); + assertThat( + op.schemaInfo().schemaIdLocation(), + is(Optional.of(op.message().bindings().kafka().schemaIdLocation()))); + assertThat(op.schemaInfo().key(), is(op.message().bindings().kafka().key())); + assertThat(op.schemaInfo().value(), is(op.message().payload())); } @Test public void shouldReturnSubscriberMessageSchema() { + // Given: final Map channelsMap = API_SPEC.channels(); - assertThat(channelsMap.entrySet(), hasSize(2)); - assertThat( - "Should have assembled 'id + channelname'", - channelsMap.keySet(), - hasItem("london.hammersmith.transport._public.tube")); - final Operation subscribe = + // When: + final Operation op = channelsMap.get("london.hammersmith.transport._public.tube").subscribe(); + + // Then: assertThat( - subscribe.message().schemaRef(), - is("london_hammersmith_transport_public_passenger.avsc")); + op.message().schemaFormat(), is("application/vnd.apache.avro+json;version=1.9.0")); + assertThat(op.message().contentType(), is("application/octet-stream")); + assertThat(op.message().bindings().kafka().schemaIdLocation(), is("header")); + assertThat( + op.message().bindings().kafka().key(), + is(Optional.of(new KafkaPart(RecordPart.KafkaType.String)))); + assertThat( + op.message().payload().schemaRef(), + is(Optional.of("london_hammersmith_transport_public_passenger.avsc"))); + + assertThat(op.schemaInfo().schemaFormat(), is(Optional.of(op.message().schemaFormat()))); + assertThat(op.schemaInfo().contentType(), is(Optional.of(op.message().contentType()))); assertThat( - subscribe.message().schemaFormat(), - is("application/vnd.apache.avro+json;version=1.9.0")); - assertThat(subscribe.message().contentType(), is("application/octet-stream")); - assertThat(subscribe.message().bindings().kafka().schemaIdLocation(), is("header")); + op.schemaInfo().schemaIdLocation(), + is(Optional.of(op.message().bindings().kafka().schemaIdLocation()))); + assertThat(op.schemaInfo().key(), is(op.message().bindings().kafka().key())); + assertThat(op.schemaInfo().value(), is(op.message().payload())); } } diff --git a/parser/src/test/java/io/specmesh/apiparser/model/MessageTest.java b/parser/src/test/java/io/specmesh/apiparser/model/MessageTest.java new file mode 100644 index 00000000..6f18c18c --- /dev/null +++ b/parser/src/test/java/io/specmesh/apiparser/model/MessageTest.java @@ -0,0 +1,244 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.model; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThrows; + +import com.fasterxml.jackson.databind.json.JsonMapper; +import io.specmesh.apiparser.AsyncApiParser.APIParserException; +import io.specmesh.apiparser.model.RecordPart.KafkaPart; +import io.specmesh.apiparser.model.RecordPart.OtherPart; +import io.specmesh.apiparser.model.RecordPart.SchemaRefPart; +import io.specmesh.apiparser.parse.SpecMapper; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +class MessageTest { + + private static final JsonMapper MAPPER = SpecMapper.mapper(); + + @Test + void shouldHandleMissingKey() throws Exception { + // Given: + final String json = "message.bindings:\n" + " kafka:\n" + "payload:\n" + " type: string"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.empty())); + } + + @Test + void shouldHandleMissingKafkaBindings() throws Exception { + // Given: + final String json = "bindings:\n" + "payload:\n" + " type: string"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.empty())); + } + + @Test + void shouldHandleMissingBindings() throws Exception { + // Given: + final String json = "payload:\n" + " type: string"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.empty())); + } + + @Test + void shouldThrowOnMissingPayload() throws Exception { + // Given: + final String json = "bindings:\n" + " kafka:\n" + " key:\n" + " type: long"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final Exception e = assertThrows(APIParserException.class, message::schemas); + + // Then: + assertThat(e.getMessage(), startsWith("missing 'message.payload' definition")); + } + + @Test + void shouldExtractInbuiltKafkaSchemaInfo() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + "payload:\n" + + " type: string"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.of(new KafkaPart(RecordPart.KafkaType.Long)))); + assertThat(schemas.value(), is(new KafkaPart(RecordPart.KafkaType.String))); + } + + @Test + void shouldExtractSchemaRefInfo() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " $ref: key.schema\n" + + "payload:\n" + + " $ref: value.schema"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.of(new SchemaRefPart("key.schema")))); + assertThat(schemas.value(), is(new SchemaRefPart("value.schema"))); + } + + @Test + void shouldNotBaulkAtInlineSchema() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: object\n" + + " properties:\n" + + " id:\n" + + "payload:\n" + + " type: object\n" + + " properties:\n" + + " name:\n"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key().map(Object::getClass), is(Optional.of(OtherPart.class))); + assertThat(schemas.value(), is(instanceOf(OtherPart.class))); + } + + @Test + void shouldExtractSchemaFormat() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + "payload:\n" + + " $ref: value.schema\n" + + "schemaFormat: expected\n"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.schemaFormat(), is(Optional.of("expected"))); + } + + @Test + void shouldExtractSchemaIdLocation() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + " schemaIdLocation: expected\n" + + "payload:\n" + + " $ref: value.schema"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.schemaIdLocation(), is(Optional.of("expected"))); + } + + @Test + void shouldExtractContentType() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + "payload:\n" + + " $ref: value.schema\n" + + "contentType: expected\n"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.contentType(), is(Optional.of("expected"))); + } + + @Test + void shouldExtractSchemaLookupStrategy() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + " schemaLookupStrategy: expected\n" + + "payload:\n" + + " $ref: value.schema"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.schemaLookupStrategy(), is(Optional.of("expected"))); + } +} diff --git a/parser/src/test/java/io/specmesh/apiparser/model/RecordPartTest.java b/parser/src/test/java/io/specmesh/apiparser/model/RecordPartTest.java new file mode 100644 index 00000000..7efba619 --- /dev/null +++ b/parser/src/test/java/io/specmesh/apiparser/model/RecordPartTest.java @@ -0,0 +1,182 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.model; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThrows; + +import com.fasterxml.jackson.databind.json.JsonMapper; +import io.specmesh.apiparser.AsyncApiParser.APIParserException; +import io.specmesh.apiparser.model.RecordPart.KafkaType; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +class RecordPartTest { + private static final JsonMapper MAPPER = JsonMapper.builder().build(); + + @Test + void shouldThrowIfNotObject() { + // Given: + final String json = "1"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + startsWith("Record part should be an object with either $ref or type.")); + } + + @Test + void shouldThrowIfArray() { + // Given: + final String json = "[]"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + startsWith("Record part should be an object with either $ref or type.")); + } + + @Test + void shouldThrowIfEmptyObject() { + // Given: + final String json = "{}"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + startsWith("Record part requires either $ref or type, but not both")); + } + + @Test + void shouldThrowOnObjectWithoutProps() { + // Given: + final String json = "{\"type\": \"object\"}"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat(e.getMessage(), startsWith("'object' types requires inline 'properties.")); + } + + @Test + void shouldThrowIfBothRefAndTypeDefined() { + // Given: + final String json = "{" + "\"type\": \"string\"," + "\"$ref\": \"/some/path\"" + "}"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + startsWith( + "Record part requires either $ref or type, but not both. location:" + + " unknown")); + } + + @Test + void shouldThrowOnUnknownKafkaType() { + // Given: + final String json = "{\"type\": \"not-valid\"}"; + + // When: + final Exception e = + assertThrows( + IllegalArgumentException.class, + () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + containsString( + "Unknown KafkaType: not-valid. Valid values are: " + + "[uuid, long, int, short, float, double, string, bytes, void]")); + } + + @ParameterizedTest + @EnumSource(KafkaType.class) + void shouldDeserializeKafkaType(final KafkaType keyType) throws Exception { + // Given: + final String json = "{\"type\": \"" + keyType.toString() + "\"}"; + + // When: + final RecordPart result = MAPPER.readValue(json, RecordPart.class); + + // Then: + assertThat(result, is(new RecordPart.KafkaPart(keyType))); + assertThat(result.schemaRef(), is(Optional.empty())); + } + + @Test + void shouldDeserializeSchemaRef() throws Exception { + // Given: + final String json = "{\"$ref\": \"/some/path/to/schema.avsc\"}"; + + // When: + final RecordPart result = MAPPER.readValue(json, RecordPart.class); + + // Then: + assertThat(result.schemaRef(), is(Optional.of("/some/path/to/schema.avsc"))); + } + + @Test + void shouldDeserializeInlineSchema() throws Exception { + // Given: + final String json = + "{" + + "\"type\": \"object\"," + + "\"properties\": {" + + " \"name\": {" + + " \"type\": \"string\"" + + " }" + + " }" + + "}"; + + // When: + final RecordPart result = MAPPER.readValue(json, RecordPart.class); + + // Then: + assertThat(result, is(instanceOf(RecordPart.OtherPart.class))); + assertThat(result.schemaRef(), is(Optional.empty())); + } +} diff --git a/parser/src/test/java/io/specmesh/apiparser/util/JsonLocationTest.java b/parser/src/test/java/io/specmesh/apiparser/util/JsonLocationTest.java new file mode 100644 index 00000000..f037aee4 --- /dev/null +++ b/parser/src/test/java/io/specmesh/apiparser/util/JsonLocationTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class JsonLocationTest { + + private static final ObjectMapper MAPPER = + new ObjectMapper().enable(JsonParser.Feature.INCLUDE_SOURCE_IN_LOCATION); + + @TempDir private Path testDir; + + @Test + void shouldGetLocationIfParsingFile() throws Exception { + // Given: + final String json = "{\"k\": \"v\"}"; + final Path file = testDir.resolve("test.yml"); + Files.writeString(file, json); + + // When: + final TestThing result = MAPPER.readValue(file.toFile(), TestThing.class); + + // Then: + final String location = result.location.toString(); + assertThat(location, startsWith("file:///")); + assertThat(location, endsWith("/test.yml:1")); + } + + @Test + void shouldNotGetLocationIfParsingText() throws Exception { + // Given: + final String json = "{\"k\": \"v\"}"; + + // When: + final TestThing result = MAPPER.readValue(json, TestThing.class); + + // Then: + assertThat(result.location, is(URI.create("unknown"))); + } + + @JsonDeserialize(using = JsonLocationTest.ThingDeserializer.class) + public static final class TestThing { + + private final URI location; + + TestThing(final URI location) { + this.location = location; + } + } + + public static final class ThingDeserializer extends JsonDeserializer { + @Override + public TestThing deserialize(final JsonParser p, final DeserializationContext ctx) + throws IOException { + p.nextFieldName(); + final URI location = JsonLocation.location(p); + p.nextTextValue(); + return new TestThing(location); + } + } +} diff --git a/parser/src/test/resources/parser_simple_schema_demo-api.yaml b/parser/src/test/resources/parser_simple_schema_demo-api.yaml index c8e4e583..6186c1ed 100644 --- a/parser/src/test/resources/parser_simple_schema_demo-api.yaml +++ b/parser/src/test/resources/parser_simple_schema_demo-api.yaml @@ -34,6 +34,8 @@ channels: bindings: kafka: schemaIdLocation: "payload" + key: + $ref: "key.avsc" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" payload: