diff --git a/README.md b/README.md index 53c2e92e..7c3ab1a2 100644 --- a/README.md +++ b/README.md @@ -43,12 +43,20 @@ channels: publish: message: + bindings: + kafka: + key: + type: long payload: $ref: "/schema/simple.schema_demo._public.user_signed_up.avsc" _private.user_checkout: publish: message: + bindings: + kafka: + key: + $ref: "/schema/simple.schema_demo._public.user_checkout_key.yml" payload: $ref: "/schema/simple.schema_demo._public.user_checkout.yml" @@ -76,6 +84,8 @@ channels: 2.2. Schema published: - /schema/simple.schema_demo._public.user_signed_up.avsc +- /schema/simple.schema_demo._public.user_checkout_key.yml +- /schema/simple.schema_demo._public.user_checkout.yml 2.3. ACLs created: - "name" : "(pattern=ResourcePattern(resourceType=TOPIC, name=simple.spec_demo._public, patternType=PREFIXED), entry=(principal=User:*, host=*, operation=READ, permissionType=ALLOW))", diff --git a/cli/resources/simple_schema_demo-api.yaml b/cli/resources/simple_schema_demo-api.yaml index 52bc5b0c..f31cb0dc 100644 --- a/cli/resources/simple_schema_demo-api.yaml +++ b/cli/resources/simple_schema_demo-api.yaml @@ -35,6 +35,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" @@ -60,6 +62,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" @@ -87,6 +91,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/cli/src/main/java/io/specmesh/cli/Export.java b/cli/src/main/java/io/specmesh/cli/Export.java index 7e27343f..dff728d1 100644 --- a/cli/src/main/java/io/specmesh/cli/Export.java +++ b/cli/src/main/java/io/specmesh/cli/Export.java @@ -20,10 +20,10 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.specmesh.apiparser.model.ApiSpec; +import io.specmesh.apiparser.parse.SpecMapper; import io.specmesh.kafka.Clients; import io.specmesh.kafka.Exporter; import java.util.Map; @@ -92,7 +92,7 @@ public Integer call() throws Exception { try (Admin adminClient = Clients.adminClient(brokerUrl, username, secret)) { final var apiSpec = Exporter.export(aggid, adminClient); final var mapper = - new ObjectMapper() + SpecMapper.mapper() .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); System.out.println(mapper.writeValueAsString(apiSpec)); diff --git a/cli/src/main/java/io/specmesh/cli/Flatten.java b/cli/src/main/java/io/specmesh/cli/Flatten.java index d8c87093..4c3e3558 100644 --- a/cli/src/main/java/io/specmesh/cli/Flatten.java +++ b/cli/src/main/java/io/specmesh/cli/Flatten.java @@ -20,10 +20,9 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.specmesh.apiparser.parse.SpecMapper; import io.specmesh.kafka.KafkaApiSpec; import java.io.FileOutputStream; import java.nio.charset.StandardCharsets; @@ -73,7 +72,7 @@ public Integer call() throws Exception { final var apiSpec = apiSpec1.apiSpec(); final var mapper = - new ObjectMapper(new YAMLFactory()) + SpecMapper.mapper() .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); final var channels = apiSpec.channels(); diff --git a/cli/src/test/resources/simple_schema_demo-api.yaml b/cli/src/test/resources/simple_schema_demo-api.yaml index 7729b91d..5122e69c 100644 --- a/cli/src/test/resources/simple_schema_demo-api.yaml +++ b/cli/src/test/resources/simple_schema_demo-api.yaml @@ -33,6 +33,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" @@ -58,6 +60,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" @@ -84,6 +88,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/kafka/src/main/java/io/specmesh/kafka/Exporter.java b/kafka/src/main/java/io/specmesh/kafka/Exporter.java index e0d4d175..84d7f2e0 100644 --- a/kafka/src/main/java/io/specmesh/kafka/Exporter.java +++ b/kafka/src/main/java/io/specmesh/kafka/Exporter.java @@ -78,7 +78,7 @@ private static TopicReaders.TopicReader reader( } /** - * Extract the Channel - todo Produce/Consume info + * Extract the Channel * * @param topic - kafka topic config map * @return decorated channel diff --git a/kafka/src/main/java/io/specmesh/kafka/ExporterYamlWriter.java b/kafka/src/main/java/io/specmesh/kafka/ExporterYamlWriter.java index 8dba9b2f..4a2851d4 100644 --- a/kafka/src/main/java/io/specmesh/kafka/ExporterYamlWriter.java +++ b/kafka/src/main/java/io/specmesh/kafka/ExporterYamlWriter.java @@ -17,9 +17,8 @@ package io.specmesh.kafka; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import io.specmesh.apiparser.model.ApiSpec; +import io.specmesh.apiparser.parse.SpecMapper; /** Export the Spec object to its yaml representation */ public class ExporterYamlWriter implements ExporterWriter { @@ -34,7 +33,7 @@ public class ExporterYamlWriter implements ExporterWriter { @Override public String export(final ApiSpec apiSpec) throws Exporter.ExporterException { try { - return new ObjectMapper(new YAMLFactory()).writeValueAsString(apiSpec); + return SpecMapper.mapper().writeValueAsString(apiSpec); } catch (JsonProcessingException e) { throw new Exporter.ExporterException("Failed to convert to YAML", e); } diff --git a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java index 2020b3d3..03a49be6 100644 --- a/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java +++ b/kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java @@ -34,6 +34,8 @@ import io.specmesh.apiparser.AsyncApiParser; import io.specmesh.apiparser.model.ApiSpec; +import io.specmesh.apiparser.model.Channel; +import io.specmesh.apiparser.model.Operation; import io.specmesh.apiparser.model.SchemaInfo; import java.io.FileInputStream; import java.io.InputStream; @@ -42,8 +44,11 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; @@ -151,13 +156,46 @@ public Set requiredAcls() { * @return the schema info. */ public SchemaInfo schemaInfoForTopic(final String topicName) { - final List myTopics = listDomainOwnedTopics(); - myTopics.stream() - .filter(topic -> topic.name().equals(topicName)) - .findFirst() - .orElseThrow(() -> new APIException("Not a domain topic:" + topicName)); + return ownedTopicSchemas(topicName) + .orElseThrow(() -> new APIException("No schema defined for topic: " + topicName)); + } + + /** + * Get info about schemas that are conceptually owned by the supplied {@code topicName}. + * + *

This differs from {@link #topicSchemas} in that it only returned schemas that should be + * registered when provisioning the topic. + * + * @param topicName the name of the topic + * @return stream of the schema info. + */ + public Optional ownedTopicSchemas(final String topicName) { + final Channel channel = apiSpec.channels().get(topicName); + if (channel == null) { + throw new APIException("Unknown topic:" + topicName); + } + + return Optional.ofNullable(channel.publish()).map(Operation::schemaInfo); + } + + /** + * Get schema info for the supplied {@code topicName} + * + *

This differs from {@link #ownedTopicSchemas} in that it returned all known schemas + * associated with the topic. + * + * @param topicName the name of the topic + * @return stream of the schema info. + */ + public Stream topicSchemas(final String topicName) { + final Channel channel = apiSpec.channels().get(topicName); + if (channel == null) { + throw new APIException("Unknown topic:" + topicName); + } - return apiSpec.channels().get(topicName).publish().schemaInfo(); + return Stream.of(channel.publish(), channel.subscribe()) + .filter(Objects::nonNull) + .map(Operation::schemaInfo); } /** diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java index 5843578f..58552657 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java @@ -23,6 +23,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.specmesh.apiparser.model.RecordPart; import io.specmesh.apiparser.model.SchemaInfo; import io.specmesh.kafka.KafkaApiSpec; import io.specmesh.kafka.provision.ExceptionWrapper; @@ -30,10 +31,13 @@ import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.WithState; import io.specmesh.kafka.provision.schema.SchemaReaders.SchemaReader; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -116,39 +120,62 @@ private static SchemaChangeSetCalculators.ChangeSetCalculator calculator( private static List requiredSchemas( final KafkaApiSpec apiSpec, final String baseResourcePath) { return apiSpec.listDomainOwnedTopics().stream() - .filter( - topic -> - apiSpec.apiSpec() - .channels() - .get(topic.name()) - .publish() - .isSchemaRequired()) - .map( - (topic -> { - final var schema = Schema.builder(); - try { - final var schemaInfo = apiSpec.schemaInfoForTopic(topic.name()); - final var schemaRef = schemaInfo.schemaRef(); - final var schemaPath = Paths.get(baseResourcePath, schemaRef); - final var schemas = - new SchemaReaders.FileSystemSchemaReader() - .readLocal(schemaPath.toString()); - schema.schemas(schemas) - .type(schemaInfo.schemaRef()) - .subject( - resolveSubjectName( - topic.name(), schemas, schemaInfo)); - schema.state(CREATE); - - } catch (Provisioner.ProvisioningException ex) { - schema.state(FAILED); - schema.exception(ex); - } - return schema.build(); - })) + .flatMap(topic -> topicSchemas(apiSpec, baseResourcePath, topic.name())) .collect(Collectors.toList()); } + private static Stream topicSchemas( + final KafkaApiSpec apiSpec, final String baseResourcePath, final String topicName) { + return apiSpec.ownedTopicSchemas(topicName).stream() + .flatMap( + si -> + Stream.of( + si.key() + .flatMap(RecordPart::schemaRef) + .map( + schemaRef -> + partSchema( + "key", + schemaRef, + si, + baseResourcePath, + topicName)), + si.value() + .schemaRef() + .map( + schemaRef -> + partSchema( + "value", + schemaRef, + si, + baseResourcePath, + topicName)))) + .flatMap(Optional::stream); + } + + private static Schema partSchema( + final String partName, + final String schemaRef, + final SchemaInfo si, + final String baseResourcePath, + final String topicName) { + final Schema.SchemaBuilder builder = Schema.builder(); + try { + final Path schemaPath = Paths.get(baseResourcePath, schemaRef); + final Collection schemas = + new SchemaReaders.FileSystemSchemaReader().readLocal(schemaPath); + + builder.schemas(schemas) + .type(schemaRef) + .subject(resolveSubjectName(topicName, schemas, si, partName)); + builder.state(CREATE); + } catch (Provisioner.ProvisioningException ex) { + builder.state(FAILED); + builder.exception(ex); + } + return builder.build(); + } + /** * Follow these guidelines for Confluent SR and APICurio * https://docs.confluent.io/platform/6.2/schema-registry/serdes-develop/index.html#referenced-schemas @@ -175,8 +202,10 @@ private static List requiredSchemas( private static String resolveSubjectName( final String topicName, final Collection schemas, - final SchemaInfo schemaInfo) { - final var lookup = schemaInfo.schemaLookupStrategy(); + final SchemaInfo schemaInfo, + final String partName) { + final String lookup = schemaInfo.schemaLookupStrategy().orElse(""); + if (lookup.equalsIgnoreCase("SimpleTopicIdStrategy")) { return topicName; } @@ -202,7 +231,7 @@ private static String resolveSubjectName( return topicName + "-" + ((AvroSchema) next).rawSchema().getFullName(); } - return topicName + "-value"; + return topicName + "-" + partName; } private static boolean isAvro(final ParsedSchema schema) { diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java index a5fee1f2..ab811ab6 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java @@ -31,17 +31,17 @@ import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; import io.specmesh.kafka.provision.schema.SchemaProvisioner.SchemaProvisioningException; -import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import lombok.Data; import lombok.experimental.Accessors; @@ -59,38 +59,32 @@ private SchemaReaders() {} public static final class FileSystemSchemaReader { - public Collection readLocal(final String filePath) { + public Collection readLocal(final Path filePath) { try { - final var schemaContent = Files.readString(Paths.get(filePath)); + final var schemaContent = Files.readString(filePath); final var results = new ArrayList(); - if (filePath.endsWith(".avsc")) { + final String filename = + Optional.ofNullable(filePath.getFileName()) + .map(Objects::toString) + .orElse(""); + if (filename.endsWith(".avsc")) { final var refs = resolveReferencesFor(filePath, schemaContent); results.add( new AvroSchema( schemaContent, refs.references, refs.resolvedReferences, -1)); - } - if (filePath.endsWith(".yml")) { + } else if (filename.endsWith(".yml")) { results.add(new JsonSchema(schemaContent)); - } - if (filePath.endsWith(".proto")) { + } else if (filename.endsWith(".proto")) { results.add(new ProtobufSchema(schemaContent)); + } else { + throw new UnsupportedOperationException("Unsupported schema file: " + filePath); } return results; - } catch (Throwable ex) { - try { - throw new SchemaProvisioningException( - "Failed to load: " - + filePath - + " from: " - + new File(".").getCanonicalFile().getAbsolutePath(), - ex); - } catch (IOException e) { - throw new RuntimeException( - "Failed to read canonical path for file system:" - + new File(".").getAbsolutePath()); - } + } catch (Exception ex) { + throw new SchemaProvisioningException( + "Failed to load: " + filePath + " from: " + filePath.toAbsolutePath(), ex); } } @@ -102,11 +96,11 @@ public Collection readLocal(final String filePath) { * @return */ private SchemaReferences resolveReferencesFor( - final String filePath, final String schemaContent) { + final Path filePath, final String schemaContent) { try { final SchemaReferences results = new SchemaReferences(); final var refs = findJsonNodes(objectMapper.readTree(schemaContent), "subject"); - final var parent = new File(filePath).getParent(); + final var parent = filePath.toFile().getParent(); refs.forEach(ref -> results.add(parent, ref)); return results; } catch (JsonProcessingException e) { @@ -203,7 +197,7 @@ private List resolvePayload(final String type, final String conten private ParsedSchema parsedSchema(final String type, final String payload) { if (type.endsWith(".avsc") || type.equals("AVRO")) { - return new AvroSchema(payload); + return new AvroSchema(payload, List.of(), Map.of(), -1); } if (type.endsWith(".yml") || type.equals("JSON")) { return new JsonSchema(payload); diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java index 70be8391..853ffd47 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecTest.java @@ -26,6 +26,7 @@ import io.specmesh.test.TestSpecLoader; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.NewTopic; @@ -190,8 +191,9 @@ void shouldNotHaveAnyAdditionalAcls() { @Test public void shouldGetSchemaInfoForOwnedTopics() { - final List newTopics = API_SPEC.listDomainOwnedTopics(); - final SchemaInfo schemaInfo = API_SPEC.schemaInfoForTopic(newTopics.get(0).name()); - assertThat(schemaInfo.schemaIdLocation(), is("header")); + final Optional schema = + API_SPEC.ownedTopicSchemas( + "london.hammersmith.olympia.bigdatalondon._public.attendee"); + assertThat(schema.flatMap(SchemaInfo::schemaIdLocation), is(Optional.of("header"))); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java index fa61c399..7686fbfc 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecWithGrantAccessAclsTest.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.NewTopic; @@ -199,8 +200,8 @@ void shouldNotHaveAnyAdditionalAcls() { @Test public void shouldGetSchemaInfoForOwnedTopics() { - final List newTopics = API_SPEC.listDomainOwnedTopics(); - final SchemaInfo schemaInfo = API_SPEC.schemaInfoForTopic(newTopics.get(0).name()); - assertThat(schemaInfo.schemaIdLocation(), is("header")); + final Optional schema = + API_SPEC.ownedTopicSchemas("london.hammersmith.olympia.bigdatalondon.attendee"); + assertThat(schema.flatMap(SchemaInfo::schemaIdLocation), is(Optional.of("header"))); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java index b376391c..01e3eb77 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java @@ -197,7 +197,7 @@ void shouldDryRunSchemasFromEmptyCluster() throws RestClientException, IOExcepti assertThat( "dry run should leave changeset in 'create' state", changeset.stream().filter(topic -> topic.state() == Status.STATE.CREATE).count(), - is(2L)); + is(3L)); // Verify - should have 2 SR entries final var allSubjects = srClient.getAllSubjects(); @@ -341,12 +341,12 @@ void shouldPublishSchemasFromEmptyCluster() throws RestClientException, IOExcept // Verify - 11 created assertThat( changeset.stream().filter(topic -> topic.state() == Status.STATE.CREATED).count(), - is(2L)); + is(3L)); - // Verify - should have 2 SR entries + // Verify - should have 3 SR entries final var allSubjects = srClient.getAllSubjects(); - assertThat(allSubjects, is(hasSize(2))); + assertThat(allSubjects, is(hasSize(3))); final var schemas = srClient.getSchemas("simple", false, false); @@ -358,6 +358,7 @@ void shouldPublishSchemasFromEmptyCluster() throws RestClientException, IOExcept is( containsInAnyOrder( "io.specmesh.kafka.schema.UserInfo", + "simple.provision_demo._public.user_signed_up_value.key.UserSignedUpKey", "simple.provision_demo._public.user_signed_up_value.UserSignedUp"))); } diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java index 9314d001..8e735566 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java @@ -36,8 +36,11 @@ import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; import io.specmesh.test.TestSpecLoader; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -59,6 +62,7 @@ class SchemaProvisionerFunctionalTest { private static final KafkaApiSpec API_UPDATE_SPEC = TestSpecLoader.loadFromClassPath("provisioner-update-functional-test-api.yaml"); + private SchemaRegistryClient srClient; private enum Domain { /** The domain associated with the spec. */ @@ -87,20 +91,47 @@ private enum Domain { Domain.SELF.domainId + "-secret") .build(); + @BeforeEach + void setUp() { + srClient = KAFKA_ENV.srClient(); + } + + @AfterEach + void tearDown() throws Exception { + srClient.close(); + } + /** setup for following update tests */ @Test @Order(1) - void shouldProvisionExistingSpec() { - SchemaProvisioner.provision( - false, false, API_SPEC, "./build/resources/test", KAFKA_ENV.srClient()); + void shouldProvisionExistingSpec() throws Exception { + // When: + final Collection provisioned = + SchemaProvisioner.provision( + false, false, API_SPEC, "./build/resources/test", srClient); + + // Then: should have provisioned 3 schema: + assertThat( + provisioned.stream().filter(topic -> topic.state() == STATE.CREATED).count(), + is(3L)); + + assertThat( + srClient.getAllSubjects(), + containsInAnyOrder( + "simple.provision_demo._public.user_signed_up-key", + "simple.provision_demo._public.user_signed_up-value", + "simple.provision_demo._protected.user_info-value")); + + assertThat( + srClient.getAllSubjectsByPrefix(API_SPEC.id()), + hasSize(srClient.getAllSubjects().size())); } @Test @Order(2) - void shouldPublishUpdatedSchemas() throws RestClientException, IOException { + void shouldPublishUpdatedSchemas() throws Exception { - final var srClient = KAFKA_ENV.srClient(); - final var dryRunChangeset = + final Collection dryRunChangeset = SchemaProvisioner.provision( true, false, API_UPDATE_SPEC, "./build/resources/test", srClient); @@ -109,12 +140,15 @@ void shouldPublishUpdatedSchemas() throws RestClientException, IOException { dryRunChangeset.stream().filter(topic -> topic.state() == STATE.UPDATE).count(), is(1L)); - // Verify - should have 2 SR entries (1 was updated, 1 was from original spec) - final var allSubjects = srClient.getAllSubjects(); - - assertThat(allSubjects, is(hasSize(2))); + // Verify - should have 3 SR entries (1 was updated, 2 was from original spec) + assertThat( + srClient.getAllSubjects(), + containsInAnyOrder( + "simple.provision_demo._public.user_signed_up-key", + "simple.provision_demo._public.user_signed_up-value", + "simple.provision_demo._protected.user_info-value")); - final var updateChangeset = + final Collection updateChangeset = SchemaProvisioner.provision( false, false, API_UPDATE_SPEC, "./build/resources/test", srClient); @@ -136,6 +170,7 @@ void shouldPublishUpdatedSchemas() throws RestClientException, IOException { is( containsInAnyOrder( "io.specmesh.kafka.schema.UserInfo", + "simple.provision_demo._public.user_signed_up_value.key.UserSignedUpKey", "simple.provision_demo._public.user_signed_up_value.UserSignedUp"))); } @@ -162,45 +197,35 @@ void shouldRemoveUnspecdSchemas() throws RestClientException, IOException { .state(STATE.READ) .build(); - try (SchemaRegistryClient srClient = KAFKA_ENV.srClient()) { + // insert the bad schema + srClient.register(subject, schema.getSchema()); - // insert the bad schema - srClient.register(subject, schema.getSchema()); - - testDryRun(subject, srClient); - testCleanUnSpecSchemas(srClient); - } + testDryRun(subject); + testCleanUnSpecSchemas(); } - private static void testCleanUnSpecSchemas(final SchemaRegistryClient srClient) - throws IOException, RestClientException { + private void testCleanUnSpecSchemas() throws IOException, RestClientException { final var cleanerSet2 = SchemaProvisioner.provision( - false, - true, - API_UPDATE_SPEC, - "./build/resources/test", - KAFKA_ENV.srClient()); + false, true, API_UPDATE_SPEC, "./build/resources/test", srClient); // verify it was removed assertThat(cleanerSet2.iterator().next().state(), is(STATE.DELETED)); - final var allSchemasforSpec = srClient.getAllSubjectsByPrefix(API_SPEC.id()); - // verify removal - assertThat(allSchemasforSpec, is(hasSize(2))); + assertThat( + srClient.getAllSubjectsByPrefix(API_SPEC.id()), + containsInAnyOrder( + "simple.provision_demo._public.user_signed_up-key", + "simple.provision_demo._public.user_signed_up-value", + "simple.provision_demo._protected.user_info-value")); } - private static void testDryRun(final String subject, final SchemaRegistryClient srClient) - throws IOException, RestClientException { + private void testDryRun(final String subject) throws IOException, RestClientException { // test dry run final var cleanerSet = SchemaProvisioner.provision( - true, - true, - API_UPDATE_SPEC, - "./build/resources/test", - KAFKA_ENV.srClient()); + true, true, API_UPDATE_SPEC, "./build/resources/test", srClient); // verify dry run assertThat(cleanerSet, is(hasSize(1))); @@ -211,6 +236,6 @@ private static void testDryRun(final String subject, final SchemaRegistryClient assertThat(cleanerSet.iterator().next().state(), is(STATE.DELETE)); final var allSchemasforId = srClient.getAllSubjectsByPrefix(API_SPEC.id()); - assertThat(allSchemasforId, is(hasSize(3))); + assertThat(allSchemasforId, is(hasSize(4))); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java index eb8297e3..a178028c 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java @@ -28,6 +28,7 @@ import io.specmesh.kafka.provision.Provisioner; import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; +import java.nio.file.Path; import java.util.List; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -82,8 +83,8 @@ void shouldOutputMessagesOnBorkedSchema() throws Exception { assertThat(schemas.iterator().next().messages(), is(containsString("borked"))); } - private static String filename(final String extra) { - return "./build/resources/test/schema/" + SCHEMA_FILENAME + extra; + private static Path filename(final String extra) { + return Path.of("./build/resources/test/schema/" + SCHEMA_FILENAME + extra); } static ParsedSchema getSchema(final String schemaRefType, final String content) { diff --git a/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml b/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml index 0c0a153c..011c7f0a 100644 --- a/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml +++ b/kafka/src/test/resources/bigdatalondon-api-with-grant-access-acls.yaml @@ -71,6 +71,10 @@ channels: tags: [ name: "grant-access:some.other.domain.root" ] + bindings: + kafka: + key: + type: long message: name: Food Item tags: [ @@ -107,6 +111,10 @@ channels: name: "human", name: "customer" ] + bindings: + kafka: + key: + type: long payload: type: object properties: diff --git a/kafka/src/test/resources/bigdatalondon-api.yaml b/kafka/src/test/resources/bigdatalondon-api.yaml index e3000bfa..fb5d44d1 100644 --- a/kafka/src/test/resources/bigdatalondon-api.yaml +++ b/kafka/src/test/resources/bigdatalondon-api.yaml @@ -68,6 +68,10 @@ channels: ] message: name: Food Item + bindings: + kafka: + key: + type: long tags: [ name: "human", name: "purchase" @@ -97,6 +101,10 @@ channels: summary: Humans customers message: name: Food Item + bindings: + kafka: + key: + type: long tags: [ name: "human", name: "customer" @@ -123,6 +131,10 @@ channels: summary: Humans arriving in the borough message: name: Human + bindings: + kafka: + key: + type: long payload: type: object properties: diff --git a/kafka/src/test/resources/clientapi-functional-test-api.yaml b/kafka/src/test/resources/clientapi-functional-test-api.yaml index 699fa43c..0c826d78 100644 --- a/kafka/src/test/resources/clientapi-functional-test-api.yaml +++ b/kafka/src/test/resources/clientapi-functional-test-api.yaml @@ -35,6 +35,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" @@ -62,6 +64,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/kafka/src/test/resources/exporter-expected-spec.yaml b/kafka/src/test/resources/exporter-expected-spec.yaml index f3f15f23..49ad1488 100644 --- a/kafka/src/test/resources/exporter-expected-spec.yaml +++ b/kafka/src/test/resources/exporter-expected-spec.yaml @@ -1,19 +1,14 @@ --- id: "urn:asyncapi-id" version: "version-123" -asyncapi: null channels: one-topic-channel: description: "one-topic-channel-description" bindings: kafka: - envs: [] partitions: 1 replicas: 3 - configs: {} groupId: "kafka-binding-group-id" schemaIdLocation: "payload" schemaLookupStrategy: "TopicNameStrategy" - bindingVersion: null - publish: null - subscribe: null + bindingVersion: "latest" diff --git a/kafka/src/test/resources/provisioner-functional-test-api.yaml b/kafka/src/test/resources/provisioner-functional-test-api.yaml index 12752941..ee2ef44e 100644 --- a/kafka/src/test/resources/provisioner-functional-test-api.yaml +++ b/kafka/src/test/resources/provisioner-functional-test-api.yaml @@ -28,19 +28,19 @@ channels: cleanup.policy: delete retention.ms: 3600000 - publish: summary: Inform about signup operationId: onUserSignedUp message: bindings: kafka: + key: + $ref: schema/simple.provision_demo._public.user_signed_up.key.avsc schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" payload: $ref: "/schema/simple.provision_demo._public.user_signed_up.avsc" - _protected.user_info: bindings: kafka: @@ -62,6 +62,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/kafka/src/test/resources/provisioner-update-functional-test-api.yaml b/kafka/src/test/resources/provisioner-update-functional-test-api.yaml index 7e77b499..2652e560 100644 --- a/kafka/src/test/resources/provisioner-update-functional-test-api.yaml +++ b/kafka/src/test/resources/provisioner-update-functional-test-api.yaml @@ -34,6 +34,8 @@ channels: message: bindings: kafka: + key: + $ref: schema/simple.provision_demo._public.user_signed_up.key.avsc schemaIdLocation: "payload" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" @@ -61,6 +63,8 @@ channels: message: bindings: kafka: + key: + type: long schemaIdLocation: "payload" schemaFormat: "application/json;version=1.9.0" contentType: "application/json" diff --git a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc new file mode 100644 index 00000000..ffdb8351 --- /dev/null +++ b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc @@ -0,0 +1,8 @@ +{ + "type": "record", + "namespace": "simple.provision_demo._public.user_signed_up_value.key", + "name": "UserSignedUpKey", + "fields": [ + {"name": "id", "type": "int"} + ] +} \ No newline at end of file diff --git a/parser/build.gradle.kts b/parser/build.gradle.kts index 8e04f2a9..c3fee22c 100644 --- a/parser/build.gradle.kts +++ b/parser/build.gradle.kts @@ -27,6 +27,7 @@ dependencies { api("com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion") implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion") + implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion") compileOnly("org.projectlombok:lombok:$lombokVersion") annotationProcessor("org.projectlombok:lombok:$lombokVersion") diff --git a/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java b/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java index 2e4a97ce..33951de6 100644 --- a/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java +++ b/parser/src/main/java/io/specmesh/apiparser/AsyncApiParser.java @@ -16,9 +16,8 @@ package io.specmesh.apiparser; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import io.specmesh.apiparser.model.ApiSpec; +import io.specmesh.apiparser.parse.SpecMapper; import java.io.IOException; import java.io.InputStream; @@ -36,7 +35,7 @@ public final ApiSpec loadResource(final InputStream inputStream) throws IOExcept if (inputStream == null || inputStream.available() == 0) { throw new RuntimeException("Not found"); } - return new ObjectMapper(new YAMLFactory()).readValue(inputStream, ApiSpec.class); + return SpecMapper.mapper().readValue(inputStream, ApiSpec.class); } public static class APIParserException extends RuntimeException { diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java b/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java index b6b40484..0d15a41b 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Bindings.java @@ -34,7 +34,7 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class Bindings { - @JsonProperty KafkaBinding kafka; + @JsonProperty private KafkaBinding kafka; public void validate() { kafka.validate(); diff --git a/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java b/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java index 4a22c55c..15d4158c 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/KafkaBinding.java @@ -19,10 +19,9 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -38,16 +37,15 @@ */ /** Pojo representing a Kafka binding */ +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") @Builder @Data @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) @Accessors(fluent = true) @JsonIgnoreProperties(ignoreUnknown = true) -@SuppressWarnings({"unchecked"}) @SuppressFBWarnings public class KafkaBinding { - private static final int DAYS_TO_MS = 24 * 60 * 60 * 1000; @Builder.Default @JsonProperty private List envs = List.of(); @@ -57,19 +55,23 @@ public class KafkaBinding { @Builder.Default @JsonProperty private Map configs = Map.of(); - @JsonProperty private String groupId; + @Builder.Default @JsonProperty private String groupId = ""; @Builder.Default @JsonProperty private String schemaIdLocation = "payload"; + @Builder.Default @JsonProperty private String schemaIdPayloadEncoding = ""; + @Builder.Default @JsonProperty private String schemaLookupStrategy = "TopicNameStrategy"; - @JsonProperty private String bindingVersion; + @Builder.Default @JsonProperty private String bindingVersion = "latest"; + + @Builder.Default @JsonProperty private Optional key = Optional.empty(); /** * @return configs */ public Map configs() { - return new LinkedHashMap<>(configs == null ? Collections.emptyMap() : configs); + return Map.copyOf(configs); } /** diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Message.java b/parser/src/main/java/io/specmesh/apiparser/model/Message.java index 1c1a5136..e094fa0a 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Message.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Message.java @@ -19,8 +19,10 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.specmesh.apiparser.AsyncApiParser.APIParserException; import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.Value; import lombok.experimental.Accessors; @@ -40,7 +42,7 @@ public final class Message { @JsonProperty private Map headers; - @JsonProperty private Map payload; + @JsonProperty private RecordPart payload; @JsonProperty private Map correlationId; @@ -70,7 +72,7 @@ private Message() { this.tags = List.of(); this.traits = Map.of(); this.messageId = null; - this.payload = Map.of(); + this.payload = null; this.name = null; this.title = null; this.summary = null; @@ -79,9 +81,22 @@ private Message() { } /** - * @return the location of the schema + * @return The schemas in use */ - public String schemaRef() { - return (String) payload.get("$ref"); + public SchemaInfo schemas() { + if (payload == null) { + throw new APIParserException("missing 'message.payload' definition"); + } + + final Optional kafkaBinding = + Optional.ofNullable(bindings).map(Bindings::kafka); + + return new SchemaInfo( + kafkaBinding.flatMap(KafkaBinding::key), + payload, + Optional.ofNullable(schemaFormat), + kafkaBinding.map(KafkaBinding::schemaIdLocation), + Optional.ofNullable(contentType), + kafkaBinding.map(KafkaBinding::schemaLookupStrategy)); } } diff --git a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java index 608e88c2..b39e3176 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/Operation.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/Operation.java @@ -54,10 +54,7 @@ public class Operation { @JsonProperty private String description; - @SuppressWarnings("rawtypes") - @JsonProperty - @Builder.Default - private List tags = List.of(); + @JsonProperty @Builder.Default private List tags = List.of(); @JsonProperty private Bindings bindings; @@ -70,16 +67,13 @@ public class Operation { * @return schema info */ public SchemaInfo schemaInfo() { - if (message.bindings() == null) { + try { + return message.schemas(); + } catch (Exception e) { throw new APIParserException( - "Bindings not found for (publish|subscribe) operation: " + operationId); + "Error extracting schemas from (publish|subscribe) operation: " + operationId, + e); } - return new SchemaInfo( - message().schemaRef(), - message().schemaFormat(), - message.bindings().kafka().schemaIdLocation(), - message().contentType(), - message.bindings().kafka().schemaLookupStrategy()); } public void validate() { @@ -87,10 +81,4 @@ public void validate() { throw new APIParserException("(publish|subscribe) operationId is null"); } } - - public boolean isSchemaRequired() { - return this.message() != null - && this.message().schemaRef() != null - && this.message().schemaRef().length() > 0; - } } diff --git a/parser/src/main/java/io/specmesh/apiparser/model/RecordPart.java b/parser/src/main/java/io/specmesh/apiparser/model/RecordPart.java new file mode 100644 index 00000000..b7513d2a --- /dev/null +++ b/parser/src/main/java/io/specmesh/apiparser/model/RecordPart.java @@ -0,0 +1,211 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.model; + +import static java.util.Objects.requireNonNull; + +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import io.specmesh.apiparser.AsyncApiParser; +import io.specmesh.apiparser.util.JsonLocation; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.Value; + +/** + * Metadata describing a part of a Kafka record. + * + *

e.g. the key or value of a record. + */ +@JsonDeserialize(using = RecordPart.Deserializer.class) +public interface RecordPart { + /** + * @return reference to an external schema file. + */ + default Optional schemaRef() { + return Optional.empty(); + } + + default Optional kafkaType() { + return Optional.empty(); + } + + // Types supported by for standard Kafka serializers: + enum KafkaType { + UUID("uuid"), + Long("long"), + Int("int"), + Short("short"), + Float("float"), + Double("double"), + String("string"), + Bytes("bytes"), + Void("void"); + + private static final String VALID_VALUES = + Arrays.stream(values()) + .map(RecordPart.KafkaType::toString) + .collect(Collectors.joining(", ", "[", "]")); + + private final String text; + + KafkaType(final String text) { + this.text = requireNonNull(text, "text"); + } + + @JsonValue + @Override + public String toString() { + return text; + } + + public static KafkaPart.KafkaType fromText(final String text) { + return Arrays.stream(values()) + .filter(t -> t.text.equals(text)) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException( + "Unknown KafkaType: " + + text + + ". Valid values are: " + + VALID_VALUES)); + } + } + + @Value + final class KafkaPart implements RecordPart { + + private final KafkaPart.KafkaType kafkaType; + + public KafkaPart(final KafkaPart.KafkaType kafkaType) { + this.kafkaType = requireNonNull(kafkaType, "kafkaType"); + } + + @Override + public Optional kafkaType() { + return Optional.of(kafkaType); + } + } + + @Value + final class SchemaRefPart implements RecordPart { + + private final String schemaRef; + + public SchemaRefPart(final String schemaRef) { + this.schemaRef = requireNonNull(schemaRef, "schemaRef"); + } + + public Optional schemaRef() { + return Optional.of(schemaRef); + } + } + + @Value + final class OtherPart implements RecordPart { + + private final JsonNode node; + + public OtherPart(final JsonNode node) { + this.node = requireNonNull(node, "node"); + } + } + + final class Deserializer extends JsonDeserializer { + + private static final String REF_FIELD = "$ref"; + private static final String TYPE_FIELD = "type"; + + @Override + public RecordPart deserialize(final JsonParser parser, final DeserializationContext ctx) + throws IOException { + final URI location = JsonLocation.location(parser); + + if (parser.currentToken() != JsonToken.START_OBJECT) { + throw new AsyncApiParser.APIParserException( + "Record part should be an object with either " + + REF_FIELD + + " or " + + TYPE_FIELD + + ". location: " + + location); + } + + parser.nextToken(); + + Optional ref = Optional.empty(); + Optional type = Optional.empty(); + Optional props = Optional.empty(); + + while (parser.currentToken() != JsonToken.END_OBJECT) { + final String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case REF_FIELD: + ref = Optional.of(ctx.readValue(parser, String.class)); + break; + case TYPE_FIELD: + type = Optional.of(ctx.readValue(parser, String.class)); + break; + case "properties": + props = Optional.of(ctx.readTree(parser)); + break; + default: + ctx.readTree(parser); + break; + } + + parser.nextToken(); + } + + if (ref.isPresent() == type.isPresent()) { + throw new AsyncApiParser.APIParserException( + "Record part requires either " + + REF_FIELD + + " or " + + TYPE_FIELD + + ", but not both. location: " + + location); + } + + if (ref.isPresent()) { + return new SchemaRefPart(ref.get()); + } + + if (!"object".equals(type.get())) { + return new KafkaPart(KafkaType.fromText(type.get())); + } + + return new OtherPart( + props.orElseThrow( + () -> + new AsyncApiParser.APIParserException( + "'object' types requires inline 'properties. location: " + + location))); + } + } +} diff --git a/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java b/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java index d2512d5c..58293a94 100644 --- a/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java +++ b/parser/src/main/java/io/specmesh/apiparser/model/SchemaInfo.java @@ -18,33 +18,39 @@ import static java.util.Objects.requireNonNull; +import java.util.Optional; import lombok.Value; import lombok.experimental.Accessors; /** Pojo representing a schmea */ +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") @Value @Accessors(fluent = true) public class SchemaInfo { - private final String schemaRef; - private final String schemaFormat; - private final String schemaIdLocation; - private final String contentType; - private String schemaLookupStrategy; + private final Optional key; + private final RecordPart value; + private final Optional schemaFormat; + private final Optional schemaIdLocation; + private final Optional contentType; + private final Optional schemaLookupStrategy; /** - * @param schemaRef location of schema + * @param key schema info for the key of the Kafka Record + * @param value schema info for the value of the Kafka Record * @param schemaFormat format of schema * @param schemaIdLocation header || payload * @param contentType content type of schema * @param schemaLookupStrategy schema lookup strategy */ public SchemaInfo( - final String schemaRef, - final String schemaFormat, - final String schemaIdLocation, - final String contentType, - final String schemaLookupStrategy) { - this.schemaRef = schemaRef; + final Optional key, + final RecordPart value, + final Optional schemaFormat, + final Optional schemaIdLocation, + final Optional contentType, + final Optional schemaLookupStrategy) { + this.key = requireNonNull(key, "key"); + this.value = requireNonNull(value, "value"); this.schemaFormat = requireNonNull(schemaFormat, "schemaFormat"); this.schemaIdLocation = requireNonNull(schemaIdLocation, "schemaIdLocation"); this.contentType = requireNonNull(contentType, "contentType"); diff --git a/parser/src/main/java/io/specmesh/apiparser/parse/SpecMapper.java b/parser/src/main/java/io/specmesh/apiparser/parse/SpecMapper.java new file mode 100644 index 00000000..9afb6441 --- /dev/null +++ b/parser/src/main/java/io/specmesh/apiparser/parse/SpecMapper.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.parse; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; + +/** ObjectMapper for use (de)serializing openapi / specmesh specs. */ +public final class SpecMapper { + + private SpecMapper() {} + + public static JsonMapper mapper() { + return JsonMapper.builder(new YAMLFactory()) + .addModule(new Jdk8Module()) + .serializationInclusion(JsonInclude.Include.NON_EMPTY) + .build(); + } +} diff --git a/parser/src/main/java/io/specmesh/apiparser/util/JsonLocation.java b/parser/src/main/java/io/specmesh/apiparser/util/JsonLocation.java new file mode 100644 index 00000000..fa2a55cf --- /dev/null +++ b/parser/src/main/java/io/specmesh/apiparser/util/JsonLocation.java @@ -0,0 +1,61 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.util; + +import com.fasterxml.jackson.core.JsonParser; +import java.io.File; +import java.net.URI; + +/** + * Util class for determining the current location within a JSON/YAML file being parsed. + * + *

Locations are in the URI form: {@code file:///path/to/file.yml:}. + */ +public final class JsonLocation { + + private static final URI UNKNOWN = URI.create("unknown"); + + private JsonLocation() {} + + /** + * Get the current location from the parser. + * + * @param parser the parser. + * @return the current location. + */ + public static URI location(final JsonParser parser) { + return location(parser.currentLocation()); + } + + /** + * Get the location from the Jackson location + * + * @param location the Jackson location + * @return the location. + */ + public static URI location(final com.fasterxml.jackson.core.JsonLocation location) { + final Object content = location.contentReference().getRawContent(); + if (!(content instanceof File)) { + return UNKNOWN; + } + + final String filePath = + ((File) content).toURI().toString().replaceFirst("file:/", "file:///"); + + return URI.create(filePath + ":" + location.getLineNr()); + } +} diff --git a/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java b/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java index e2b709a5..4b1a1652 100644 --- a/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java +++ b/parser/src/test/java/io/specmesh/apiparser/AsyncApiSchemaParserTest.java @@ -24,7 +24,11 @@ import io.specmesh.apiparser.model.ApiSpec; import io.specmesh.apiparser.model.Channel; import io.specmesh.apiparser.model.Operation; +import io.specmesh.apiparser.model.RecordPart; +import io.specmesh.apiparser.model.RecordPart.KafkaPart; +import io.specmesh.apiparser.model.RecordPart.SchemaRefPart; import java.util.Map; +import java.util.Optional; import org.junit.jupiter.api.Test; public class AsyncApiSchemaParserTest { @@ -40,36 +44,55 @@ public void shouldReturnProducerMessageSchema() { channelsMap.keySet(), hasItem(".simple.schema-demo._public.user.signed")); - final Operation publish = - channelsMap.get(".simple.schema-demo._public.user.signed").publish(); - assertThat(publish.message().schemaRef(), is("simple_schema_demo_user-signedup.avsc")); + final Operation op = channelsMap.get(".simple.schema-demo._public.user.signed").publish(); assertThat( - publish.message().schemaFormat(), - is("application/vnd.apache.avro+json;version=1.9.0")); - assertThat(publish.message().contentType(), is("application/octet-stream")); - assertThat(publish.message().bindings().kafka().schemaIdLocation(), is("payload")); - assertThat(publish.schemaInfo().schemaIdLocation(), is("payload")); + op.message().schemaFormat(), is("application/vnd.apache.avro+json;version=1.9.0")); + assertThat(op.message().contentType(), is("application/octet-stream")); + assertThat(op.message().bindings().kafka().schemaIdLocation(), is("payload")); + assertThat( + op.message().bindings().kafka().key(), + is(Optional.of(new SchemaRefPart("key.avsc")))); + assertThat( + op.message().payload().schemaRef(), + is(Optional.of("simple_schema_demo_user-signedup.avsc"))); + + assertThat(op.schemaInfo().schemaFormat(), is(Optional.of(op.message().schemaFormat()))); + assertThat(op.schemaInfo().contentType(), is(Optional.of(op.message().contentType()))); + assertThat( + op.schemaInfo().schemaIdLocation(), + is(Optional.of(op.message().bindings().kafka().schemaIdLocation()))); + assertThat(op.schemaInfo().key(), is(op.message().bindings().kafka().key())); + assertThat(op.schemaInfo().value(), is(op.message().payload())); } @Test public void shouldReturnSubscriberMessageSchema() { + // Given: final Map channelsMap = API_SPEC.channels(); - assertThat(channelsMap.entrySet(), hasSize(2)); - assertThat( - "Should have assembled 'id + channelname'", - channelsMap.keySet(), - hasItem("london.hammersmith.transport._public.tube")); - final Operation subscribe = + // When: + final Operation op = channelsMap.get("london.hammersmith.transport._public.tube").subscribe(); + + // Then: assertThat( - subscribe.message().schemaRef(), - is("london_hammersmith_transport_public_passenger.avsc")); + op.message().schemaFormat(), is("application/vnd.apache.avro+json;version=1.9.0")); + assertThat(op.message().contentType(), is("application/octet-stream")); + assertThat(op.message().bindings().kafka().schemaIdLocation(), is("header")); + assertThat( + op.message().bindings().kafka().key(), + is(Optional.of(new KafkaPart(RecordPart.KafkaType.String)))); + assertThat( + op.message().payload().schemaRef(), + is(Optional.of("london_hammersmith_transport_public_passenger.avsc"))); + + assertThat(op.schemaInfo().schemaFormat(), is(Optional.of(op.message().schemaFormat()))); + assertThat(op.schemaInfo().contentType(), is(Optional.of(op.message().contentType()))); assertThat( - subscribe.message().schemaFormat(), - is("application/vnd.apache.avro+json;version=1.9.0")); - assertThat(subscribe.message().contentType(), is("application/octet-stream")); - assertThat(subscribe.message().bindings().kafka().schemaIdLocation(), is("header")); + op.schemaInfo().schemaIdLocation(), + is(Optional.of(op.message().bindings().kafka().schemaIdLocation()))); + assertThat(op.schemaInfo().key(), is(op.message().bindings().kafka().key())); + assertThat(op.schemaInfo().value(), is(op.message().payload())); } } diff --git a/parser/src/test/java/io/specmesh/apiparser/model/MessageTest.java b/parser/src/test/java/io/specmesh/apiparser/model/MessageTest.java new file mode 100644 index 00000000..6f18c18c --- /dev/null +++ b/parser/src/test/java/io/specmesh/apiparser/model/MessageTest.java @@ -0,0 +1,244 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.model; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThrows; + +import com.fasterxml.jackson.databind.json.JsonMapper; +import io.specmesh.apiparser.AsyncApiParser.APIParserException; +import io.specmesh.apiparser.model.RecordPart.KafkaPart; +import io.specmesh.apiparser.model.RecordPart.OtherPart; +import io.specmesh.apiparser.model.RecordPart.SchemaRefPart; +import io.specmesh.apiparser.parse.SpecMapper; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +class MessageTest { + + private static final JsonMapper MAPPER = SpecMapper.mapper(); + + @Test + void shouldHandleMissingKey() throws Exception { + // Given: + final String json = "message.bindings:\n" + " kafka:\n" + "payload:\n" + " type: string"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.empty())); + } + + @Test + void shouldHandleMissingKafkaBindings() throws Exception { + // Given: + final String json = "bindings:\n" + "payload:\n" + " type: string"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.empty())); + } + + @Test + void shouldHandleMissingBindings() throws Exception { + // Given: + final String json = "payload:\n" + " type: string"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.empty())); + } + + @Test + void shouldThrowOnMissingPayload() throws Exception { + // Given: + final String json = "bindings:\n" + " kafka:\n" + " key:\n" + " type: long"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final Exception e = assertThrows(APIParserException.class, message::schemas); + + // Then: + assertThat(e.getMessage(), startsWith("missing 'message.payload' definition")); + } + + @Test + void shouldExtractInbuiltKafkaSchemaInfo() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + "payload:\n" + + " type: string"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.of(new KafkaPart(RecordPart.KafkaType.Long)))); + assertThat(schemas.value(), is(new KafkaPart(RecordPart.KafkaType.String))); + } + + @Test + void shouldExtractSchemaRefInfo() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " $ref: key.schema\n" + + "payload:\n" + + " $ref: value.schema"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key(), is(Optional.of(new SchemaRefPart("key.schema")))); + assertThat(schemas.value(), is(new SchemaRefPart("value.schema"))); + } + + @Test + void shouldNotBaulkAtInlineSchema() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: object\n" + + " properties:\n" + + " id:\n" + + "payload:\n" + + " type: object\n" + + " properties:\n" + + " name:\n"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.key().map(Object::getClass), is(Optional.of(OtherPart.class))); + assertThat(schemas.value(), is(instanceOf(OtherPart.class))); + } + + @Test + void shouldExtractSchemaFormat() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + "payload:\n" + + " $ref: value.schema\n" + + "schemaFormat: expected\n"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.schemaFormat(), is(Optional.of("expected"))); + } + + @Test + void shouldExtractSchemaIdLocation() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + " schemaIdLocation: expected\n" + + "payload:\n" + + " $ref: value.schema"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.schemaIdLocation(), is(Optional.of("expected"))); + } + + @Test + void shouldExtractContentType() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + "payload:\n" + + " $ref: value.schema\n" + + "contentType: expected\n"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.contentType(), is(Optional.of("expected"))); + } + + @Test + void shouldExtractSchemaLookupStrategy() throws Exception { + // Given: + final String json = + "bindings:\n" + + " kafka:\n" + + " key:\n" + + " type: long\n" + + " schemaLookupStrategy: expected\n" + + "payload:\n" + + " $ref: value.schema"; + + final Message message = MAPPER.readValue(json, Message.class); + + // When: + final SchemaInfo schemas = message.schemas(); + + // Then: + assertThat(schemas.schemaLookupStrategy(), is(Optional.of("expected"))); + } +} diff --git a/parser/src/test/java/io/specmesh/apiparser/model/RecordPartTest.java b/parser/src/test/java/io/specmesh/apiparser/model/RecordPartTest.java new file mode 100644 index 00000000..7efba619 --- /dev/null +++ b/parser/src/test/java/io/specmesh/apiparser/model/RecordPartTest.java @@ -0,0 +1,182 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.model; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThrows; + +import com.fasterxml.jackson.databind.json.JsonMapper; +import io.specmesh.apiparser.AsyncApiParser.APIParserException; +import io.specmesh.apiparser.model.RecordPart.KafkaType; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +class RecordPartTest { + private static final JsonMapper MAPPER = JsonMapper.builder().build(); + + @Test + void shouldThrowIfNotObject() { + // Given: + final String json = "1"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + startsWith("Record part should be an object with either $ref or type.")); + } + + @Test + void shouldThrowIfArray() { + // Given: + final String json = "[]"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + startsWith("Record part should be an object with either $ref or type.")); + } + + @Test + void shouldThrowIfEmptyObject() { + // Given: + final String json = "{}"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + startsWith("Record part requires either $ref or type, but not both")); + } + + @Test + void shouldThrowOnObjectWithoutProps() { + // Given: + final String json = "{\"type\": \"object\"}"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat(e.getMessage(), startsWith("'object' types requires inline 'properties.")); + } + + @Test + void shouldThrowIfBothRefAndTypeDefined() { + // Given: + final String json = "{" + "\"type\": \"string\"," + "\"$ref\": \"/some/path\"" + "}"; + + // When: + final Exception e = + assertThrows( + APIParserException.class, () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + startsWith( + "Record part requires either $ref or type, but not both. location:" + + " unknown")); + } + + @Test + void shouldThrowOnUnknownKafkaType() { + // Given: + final String json = "{\"type\": \"not-valid\"}"; + + // When: + final Exception e = + assertThrows( + IllegalArgumentException.class, + () -> MAPPER.readValue(json, RecordPart.class)); + + // Then: + assertThat( + e.getMessage(), + containsString( + "Unknown KafkaType: not-valid. Valid values are: " + + "[uuid, long, int, short, float, double, string, bytes, void]")); + } + + @ParameterizedTest + @EnumSource(KafkaType.class) + void shouldDeserializeKafkaType(final KafkaType keyType) throws Exception { + // Given: + final String json = "{\"type\": \"" + keyType.toString() + "\"}"; + + // When: + final RecordPart result = MAPPER.readValue(json, RecordPart.class); + + // Then: + assertThat(result, is(new RecordPart.KafkaPart(keyType))); + assertThat(result.schemaRef(), is(Optional.empty())); + } + + @Test + void shouldDeserializeSchemaRef() throws Exception { + // Given: + final String json = "{\"$ref\": \"/some/path/to/schema.avsc\"}"; + + // When: + final RecordPart result = MAPPER.readValue(json, RecordPart.class); + + // Then: + assertThat(result.schemaRef(), is(Optional.of("/some/path/to/schema.avsc"))); + } + + @Test + void shouldDeserializeInlineSchema() throws Exception { + // Given: + final String json = + "{" + + "\"type\": \"object\"," + + "\"properties\": {" + + " \"name\": {" + + " \"type\": \"string\"" + + " }" + + " }" + + "}"; + + // When: + final RecordPart result = MAPPER.readValue(json, RecordPart.class); + + // Then: + assertThat(result, is(instanceOf(RecordPart.OtherPart.class))); + assertThat(result.schemaRef(), is(Optional.empty())); + } +} diff --git a/parser/src/test/java/io/specmesh/apiparser/util/JsonLocationTest.java b/parser/src/test/java/io/specmesh/apiparser/util/JsonLocationTest.java new file mode 100644 index 00000000..f037aee4 --- /dev/null +++ b/parser/src/test/java/io/specmesh/apiparser/util/JsonLocationTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.apiparser.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class JsonLocationTest { + + private static final ObjectMapper MAPPER = + new ObjectMapper().enable(JsonParser.Feature.INCLUDE_SOURCE_IN_LOCATION); + + @TempDir private Path testDir; + + @Test + void shouldGetLocationIfParsingFile() throws Exception { + // Given: + final String json = "{\"k\": \"v\"}"; + final Path file = testDir.resolve("test.yml"); + Files.writeString(file, json); + + // When: + final TestThing result = MAPPER.readValue(file.toFile(), TestThing.class); + + // Then: + final String location = result.location.toString(); + assertThat(location, startsWith("file:///")); + assertThat(location, endsWith("/test.yml:1")); + } + + @Test + void shouldNotGetLocationIfParsingText() throws Exception { + // Given: + final String json = "{\"k\": \"v\"}"; + + // When: + final TestThing result = MAPPER.readValue(json, TestThing.class); + + // Then: + assertThat(result.location, is(URI.create("unknown"))); + } + + @JsonDeserialize(using = JsonLocationTest.ThingDeserializer.class) + public static final class TestThing { + + private final URI location; + + TestThing(final URI location) { + this.location = location; + } + } + + public static final class ThingDeserializer extends JsonDeserializer { + @Override + public TestThing deserialize(final JsonParser p, final DeserializationContext ctx) + throws IOException { + p.nextFieldName(); + final URI location = JsonLocation.location(p); + p.nextTextValue(); + return new TestThing(location); + } + } +} diff --git a/parser/src/test/resources/parser_simple_schema_demo-api.yaml b/parser/src/test/resources/parser_simple_schema_demo-api.yaml index c8e4e583..6186c1ed 100644 --- a/parser/src/test/resources/parser_simple_schema_demo-api.yaml +++ b/parser/src/test/resources/parser_simple_schema_demo-api.yaml @@ -34,6 +34,8 @@ channels: bindings: kafka: schemaIdLocation: "payload" + key: + $ref: "key.avsc" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" payload: