Skip to content

Commit

Permalink
#406 ensure schema references are ordered correctly (#407)
Browse files Browse the repository at this point in the history
* #406 ensure schema references are ordered correctly. Added test to load a spec with Currency and also Trade - currency is ref'd by Trade, the sort will ensure that '0' ref schemas register first. hard to test due to channel names being stored in a map

* remove test ordering

* undo remove test ordering
  • Loading branch information
navery-max authored Oct 17, 2024
1 parent 3bec34a commit b8be16d
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -78,7 +79,15 @@ public static Collection<Schema> provision(
}

final var schemas = calculator(client, cleanUnspecified).calculate(existing, required);
return mutator(dryRun, cleanUnspecified, client).mutate(schemas);
return mutator(dryRun, cleanUnspecified, client).mutate(sortByReferences(schemas));
}

private static Collection<Schema> sortByReferences(final Collection<Schema> schemas) {
return schemas.stream()
.sorted(
Comparator.comparingInt(
o -> o.schemas.iterator().next().references().size()))
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.specmesh.kafka.KafkaEnvironment;
import io.specmesh.kafka.provision.schema.SchemaProvisioner;
import io.specmesh.test.TestSpecLoader;
import java.util.ArrayList;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
Expand All @@ -43,6 +44,10 @@ class SchemaProvisionerReferenceTest {
private static final KafkaApiSpec API_SPEC =
TestSpecLoader.loadFromClassPath("schema-ref/com.example.trading-api.yml");

private static final KafkaApiSpec SPEC_WITH_REFS_API_SPEC =
TestSpecLoader.loadFromClassPath(
"schema-ref/com.example.single-spec-with-refs-api.yml");

private static final String ADMIN_USER = "admin";

@RegisterExtension
Expand Down Expand Up @@ -76,7 +81,7 @@ void shouldProvisionSpecWithMissingRefToCurrency() {
/** publish common schema (via api) and also domain-owned schema */
@Test
@Order(2)
void shouldProvisionSpecWithRefs() {
void shouldProvisionTwoSpecsWithRefs() {

final var provisionCommon =
SchemaProvisioner.provision(
Expand Down Expand Up @@ -106,4 +111,43 @@ void shouldProvisionSpecWithRefs() {
schemaState.state(),
Matchers.is(Status.STATE.CREATED));
}

@Test
@Order(3)
void shouldProvisionSpecsWithMultipleRefsSoThatZeroRefsRegisterFirst() {

final var provisionBothSchemas =
SchemaProvisioner.provision(
false,
false,
SPEC_WITH_REFS_API_SPEC,
"./src/test/resources/schema-ref",
KAFKA_ENV.srClient());

final var schemas = new ArrayList<>(provisionBothSchemas);

assertThat(schemas, Matchers.hasSize(2));

final var currencySchemaState = schemas.get(0);
assertThat(
"Failed to load with:" + currencySchemaState.subject(),
currencySchemaState.subject(),
Matchers.is("com.example.shared.Currency"));

assertThat(
"Failed to load with:" + currencySchemaState,
currencySchemaState.state(),
Matchers.is(Status.STATE.CREATED));

final var tradeSchemaState = schemas.get(1);
assertThat(
"Failed to load with:" + tradeSchemaState.subject(),
tradeSchemaState.subject(),
Matchers.is("com.example.refs._public.trade-value"));

assertThat(
"Failed to load with:" + tradeSchemaState,
tradeSchemaState.state(),
Matchers.is(Status.STATE.CREATED));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
asyncapi: '2.4.0'
id: 'urn:com.example.refs'
info:
title: Common Data Set
version: '1.0.0'
description: |
Contains both a TRADE and Currency - where Trade --> Currency, it will provision both - but should determine the ref and process the 0 refs first
servers:
mosquitto:
url: mqtt://test.mosquitto.org
protocol: kafka
channels:
_public.trade:
bindings:
kafka:
envs:
- staging
- prod
partitions: 3
replicas: 1
configs:
cleanup.policy: delete
retention.ms: 999000
publish:
summary: Trade feed
description: Doing clever things
operationId: onTrade received
message:
bindings:
kafka:
schemaIdLocation: "header"
key:
type: string

schemaFormat: "application/vnd.apache.avro+json;version=1.9.0"
contentType: "application/octet-stream"
payload:
$ref: "/schema/com.example.trading.Trade.avsc"
_public.currency:
bindings:
kafka:
configs:
retention.ms: 999000
publish:
summary: Currency things
operationId: onCurrencyUpdate
message:
schemaFormat: "application/vnd.apache.avro+json;version=1.9.0"
contentType: "application/octet-stream"
bindings:
kafka:
schemaIdLocation: "header"
schemaLookupStrategy: "RecordNameStrategy"
key:
type: string
payload:
$ref: "/schema/com.example.shared.Currency.avsc"

0 comments on commit b8be16d

Please sign in to comment.