Skip to content

Commit

Permalink
Report incompatible schema (#423)
Browse files Browse the repository at this point in the history
fixes: #422

Previously, it was the change set calculator that checked for schema compatability, setting the status to `FAILED`. As there was no mutator to handled `FAILED` schema, any `FAILED` schema were filtered out and not returned to the caller.

This change sees the compatability check moved to the mutator.

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and Andy Coates authored Nov 25, 2024
1 parent 7b195f7 commit cf45f0a
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 129 deletions.
14 changes: 14 additions & 0 deletions kafka/src/main/java/io/specmesh/kafka/KafkaApiSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,20 @@ public static KafkaApiSpec loadFromFileSystem(final String spec) {
}
}

/**
* Load from string
*
* @param spec the contents of the spec.
* @return loaded spec
*/
public static KafkaApiSpec loadFromString(final String spec) {
try {
return new KafkaApiSpec(new AsyncApiParser().loadResource(spec));
} catch (Exception ex) {
throw new APIException("Failed to load spec:" + spec, ex);
}
}

public ApiSpec apiSpec() {
return apiSpec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.specmesh.kafka.provision.AclProvisioner.Acl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
Expand Down Expand Up @@ -101,7 +101,7 @@ public Collection<Acl> read(
.toString()
.contains(
"org.apache.kafka.common.errors.SecurityDisabledException")) {
return List.of();
return new ArrayList<>();
}
throw new ProvisioningException("Failed to read ACLs", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.specmesh.kafka.provision.Status;
import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -82,12 +79,6 @@ public Collection<Schema> calculate(
/** Returns those schemas to create and ignores existing */
static final class UpdateCalculator implements ChangeSetCalculator {

private final SchemaRegistryClient client;

private UpdateCalculator(final SchemaRegistryClient client) {
this.client = client;
}

@Override
public Collection<Schema> calculate(
final Collection<Schema> existing,
Expand All @@ -97,29 +88,9 @@ public Collection<Schema> calculate(
return required.stream()
.filter(needs -> hasChanged(needs, existingList))
.peek(
schema -> {
schema.messages(schema.messages() + "\n Update");
try {
final var compatibilityMessages =
client.testCompatibilityVerbose(
schema.subject(), schema.schema());

if (!compatibilityMessages.isEmpty()) {
schema.messages(
schema.messages()
+ "\nCompatibility test output:"
+ compatibilityMessages);

schema.state(Status.STATE.FAILED);
} else {
schema.state(Status.STATE.UPDATE);
}

} catch (IOException | RestClientException ex) {
schema.state(Status.STATE.FAILED);
schema.messages(schema.messages() + "\nException:" + ex);
}
})
schema ->
schema.messages(schema.messages() + "\n Update")
.state(Status.STATE.UPDATE))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -243,19 +214,15 @@ static ChangeSetBuilder builder() {
* build it
*
* @param cleanUnspecified - cleanup
* @param client sr client
* @return required calculator
*/
ChangeSetCalculator build(
final boolean cleanUnspecified, final SchemaRegistryClient client) {
ChangeSetCalculator build(final boolean cleanUnspecified) {
if (cleanUnspecified) {
return new CleanUnspecifiedCalculator();

} else {
return new Collective(
new IgnoreCalculator(),
new UpdateCalculator(client),
new CreateCalculator());
new IgnoreCalculator(), new UpdateCalculator(), new CreateCalculator());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.specmesh.kafka.provision.ProvisioningException;
import io.specmesh.kafka.provision.Status;
import io.specmesh.kafka.provision.Status.STATE;
import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema;
import java.io.IOException;
Expand Down Expand Up @@ -149,27 +150,36 @@ public UpdateMutator(final SchemaRegistryClient client) {
public List<Schema> mutate(final Collection<Schema> schemas) {
return schemas.stream()
.filter(schema -> schema.state().equals(STATE.UPDATE))
.peek(
schema -> {
try {
final var schemaId =
client.register(schema.subject(), schema.schema());
schema.state(UPDATED);
schema.messages(
"Subject:"
+ schema.subject()
+ " Updated with id: "
+ schemaId);
} catch (IOException | RestClientException e) {
schema.exception(
new ProvisioningException(
"Failed to update schema:" + schema.subject(),
e));
schema.state(FAILED);
}
})
.map(this::register)
.collect(Collectors.toList());
}

private Schema register(final Schema schema) {
try {
final var compatibilityMessages =
client.testCompatibilityVerbose(schema.subject(), schema.schema());

if (!compatibilityMessages.isEmpty()) {
schema.messages(
schema.messages()
+ "\nCompatibility test output:"
+ compatibilityMessages);

schema.state(Status.STATE.FAILED);
return schema;
}

final var schemaId = client.register(schema.subject(), schema.schema());
schema.state(UPDATED);
schema.messages("Subject:" + schema.subject() + " Updated with id: " + schemaId);
} catch (IOException | RestClientException e) {
schema.exception(
new ProvisioningException(
"Failed to update schema:" + schema.subject(), e));
schema.state(FAILED);
}
return schema;
}
}

/** Mutate Schemas */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static List<Schema> provision(
}

final Collection<Schema> schemas =
calculator(client, cleanUnspecified).calculate(existing, required, apiSpec.id());
calculator(cleanUnspecified).calculate(existing, required, apiSpec.id());
return mutator(dryRun, cleanUnspecified, client).mutate(schemas);
}

Expand Down Expand Up @@ -112,8 +112,8 @@ private static SchemaMutators.SchemaMutator mutator(
* @return calculator
*/
private static SchemaChangeSetCalculators.ChangeSetCalculator calculator(
final SchemaRegistryClient client, final boolean cleanUnspecified) {
return SchemaChangeSetCalculators.builder().build(cleanUnspecified, client);
final boolean cleanUnspecified) {
return SchemaChangeSetCalculators.builder().build(cleanUnspecified);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.kafka.schemaregistry.ParsedSchema;
Expand Down Expand Up @@ -59,6 +60,10 @@ class SchemaProvisionerFunctionalTest {

private static final KafkaApiSpec API_UPDATE_SPEC =
TestSpecLoader.loadFromClassPath("provisioner-update-functional-test-api.yaml");

private static final KafkaApiSpec API_INCOMPATiBLE_SPEC =
TestSpecLoader.loadFromClassPath("provisioner-incompatible-functional-test-api.yaml");

private SchemaRegistryClient srClient;

private enum Domain {
Expand Down Expand Up @@ -173,6 +178,43 @@ void shouldPublishUpdatedSchemas() throws Exception {

@Test
@Order(3)
void shouldFailToPublishIncompatibleSchema() throws Exception {
final Collection<Schema> dryRunChangeset =
SchemaProvisioner.provision(
true, false, API_INCOMPATiBLE_SPEC, "./src/test/resources", srClient);

// Verify - the Update is proposed
assertThat(
dryRunChangeset.stream().filter(topic -> topic.state() == STATE.UPDATE).count(),
is(1L));

// Verify - should have 3 SR entries (1 was updated, 2 was from original spec)
assertThat(
srClient.getAllSubjects(),
containsInAnyOrder(
"simple.provision_demo._public.user_signed_up-key",
"simple.provision_demo._public.user_signed_up-value",
"simple.provision_demo._protected.user_info-value"));

final Collection<Schema> updateChangeset =
SchemaProvisioner.provision(
false, false, API_INCOMPATiBLE_SPEC, "./src/test/resources", srClient);

assertThat(updateChangeset, hasSize(1));
final Schema schema = updateChangeset.iterator().next();
assertThat(schema.state(), is(STATE.FAILED));
assertThat(schema.messages(), containsString("READER_FIELD_MISSING_DEFAULT_VALUE"));

final var parsedSchemas = srClient.getSchemas(schema.subject(), false, true);

assertThat(
"Schema should not be updated",
parsedSchemas.get(0).canonicalString(),
not(containsString("borked")));
}

@Test
@Order(4)
void shouldRemoveUnspecdSchemas() throws Exception {

final var subject = "simple.provision_demo._public.NOT_user_signed_up-value";
Expand Down
Loading

0 comments on commit cf45f0a

Please sign in to comment.