diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java index 2a5139e0..f8a3fcac 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java @@ -34,6 +34,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -78,7 +79,15 @@ public static Collection provision( } final var schemas = calculator(client, cleanUnspecified).calculate(existing, required); - return mutator(dryRun, cleanUnspecified, client).mutate(schemas); + return mutator(dryRun, cleanUnspecified, client).mutate(sortByReferences(schemas)); + } + + private static Collection sortByReferences(final Collection schemas) { + return schemas.stream() + .sorted( + Comparator.comparingInt( + o -> o.schemas.iterator().next().references().size())) + .collect(Collectors.toList()); } /** diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerReferenceTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerReferenceTest.java index f749e661..60d02a7e 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerReferenceTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerReferenceTest.java @@ -23,6 +23,7 @@ import io.specmesh.kafka.KafkaEnvironment; import io.specmesh.kafka.provision.schema.SchemaProvisioner; import io.specmesh.test.TestSpecLoader; +import java.util.ArrayList; import org.hamcrest.Matchers; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; @@ -43,6 +44,10 @@ class SchemaProvisionerReferenceTest { private static final KafkaApiSpec API_SPEC = TestSpecLoader.loadFromClassPath("schema-ref/com.example.trading-api.yml"); + private static final KafkaApiSpec SPEC_WITH_REFS_API_SPEC = + TestSpecLoader.loadFromClassPath( + "schema-ref/com.example.single-spec-with-refs-api.yml"); + private static final String ADMIN_USER = "admin"; @RegisterExtension @@ -76,7 +81,7 @@ void shouldProvisionSpecWithMissingRefToCurrency() { /** publish common schema (via api) and also domain-owned schema */ @Test @Order(2) - void shouldProvisionSpecWithRefs() { + void shouldProvisionTwoSpecsWithRefs() { final var provisionCommon = SchemaProvisioner.provision( @@ -106,4 +111,43 @@ void shouldProvisionSpecWithRefs() { schemaState.state(), Matchers.is(Status.STATE.CREATED)); } + + @Test + @Order(3) + void shouldProvisionSpecsWithMultipleRefsSoThatZeroRefsRegisterFirst() { + + final var provisionBothSchemas = + SchemaProvisioner.provision( + false, + false, + SPEC_WITH_REFS_API_SPEC, + "./src/test/resources/schema-ref", + KAFKA_ENV.srClient()); + + final var schemas = new ArrayList<>(provisionBothSchemas); + + assertThat(schemas, Matchers.hasSize(2)); + + final var currencySchemaState = schemas.get(0); + assertThat( + "Failed to load with:" + currencySchemaState.subject(), + currencySchemaState.subject(), + Matchers.is("com.example.shared.Currency")); + + assertThat( + "Failed to load with:" + currencySchemaState, + currencySchemaState.state(), + Matchers.is(Status.STATE.CREATED)); + + final var tradeSchemaState = schemas.get(1); + assertThat( + "Failed to load with:" + tradeSchemaState.subject(), + tradeSchemaState.subject(), + Matchers.is("com.example.refs._public.trade-value")); + + assertThat( + "Failed to load with:" + tradeSchemaState, + tradeSchemaState.state(), + Matchers.is(Status.STATE.CREATED)); + } } diff --git a/kafka/src/test/resources/schema-ref/com.example.single-spec-with-refs-api.yml b/kafka/src/test/resources/schema-ref/com.example.single-spec-with-refs-api.yml new file mode 100644 index 00000000..a47bc1bc --- /dev/null +++ b/kafka/src/test/resources/schema-ref/com.example.single-spec-with-refs-api.yml @@ -0,0 +1,58 @@ +asyncapi: '2.4.0' +id: 'urn:com.example.refs' +info: + title: Common Data Set + version: '1.0.0' + description: | + Contains both a TRADE and Currency - where Trade --> Currency, it will provision both - but should determine the ref and process the 0 refs first +servers: + mosquitto: + url: mqtt://test.mosquitto.org + protocol: kafka +channels: + _public.trade: + bindings: + kafka: + envs: + - staging + - prod + partitions: 3 + replicas: 1 + configs: + cleanup.policy: delete + retention.ms: 999000 + publish: + summary: Trade feed + description: Doing clever things + operationId: onTrade received + message: + bindings: + kafka: + schemaIdLocation: "header" + key: + type: string + + schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" + contentType: "application/octet-stream" + payload: + $ref: "/schema/com.example.trading.Trade.avsc" + _public.currency: + bindings: + kafka: + configs: + retention.ms: 999000 + publish: + summary: Currency things + operationId: onCurrencyUpdate + message: + schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" + contentType: "application/octet-stream" + bindings: + kafka: + schemaIdLocation: "header" + schemaLookupStrategy: "RecordNameStrategy" + key: + type: string + payload: + $ref: "/schema/com.example.shared.Currency.avsc" +