diff --git a/cli/src/main/java/io/specmesh/cli/Provision.java b/cli/src/main/java/io/specmesh/cli/Provision.java index 6d6585bd..1cd8e3f8 100644 --- a/cli/src/main/java/io/specmesh/cli/Provision.java +++ b/cli/src/main/java/io/specmesh/cli/Provision.java @@ -90,13 +90,22 @@ public static void main(final String[] args) { @Option( names = {"-dry", "--dry-run"}, description = - "Compares the cluster against the spec, outputting proposed changes if" - + " compatible.If the spec incompatible with the cluster (not sure how it" - + " could be) then will fail with a descriptive error message.A return" - + " value of 0=indicates no changes needed; 1=changes needed; -1=not" - + " compatible, blah blah") + "Compares the cluster resources against the spec, outputting proposed changes" + + " if compatible. If the spec incompatible with the cluster (not sure how" + + " it could be) then will fail with a descriptive error message. A return" + + " value of '0' = indicates no changes needed; '1' = changes needed; '-1'" + + " not compatible") private boolean dryRun; + @Option( + names = {"-clean", "--clean-unspecified"}, + description = + "Compares the cluster resources against the spec, outputting proposed set of" + + " resources that are unexpected (not specified). Use with '-dry-run' for" + + " non-destructive checks. This operation will not create resources, it" + + " will only remove unspecified resources") + private boolean cleanUnspecified; + @Option( names = "-D", mapFallbackValue = "", @@ -113,6 +122,7 @@ public Integer call() throws Exception { final var status = Provisioner.provision( dryRun, + cleanUnspecified, specMeshSpec(), schemaPath, Clients.adminClient(brokerUrl, username, secret), diff --git a/cli/src/test/java/io/specmesh/cli/StorageConsumptionFunctionalTest.java b/cli/src/test/java/io/specmesh/cli/StorageConsumptionFunctionalTest.java index a944ed5f..91153c2d 100644 --- a/cli/src/test/java/io/specmesh/cli/StorageConsumptionFunctionalTest.java +++ b/cli/src/test/java/io/specmesh/cli/StorageConsumptionFunctionalTest.java @@ -75,6 +75,7 @@ class StorageConsumptionFunctionalTest { void shouldGetStorageAndConsumptionMetrics() throws Exception { Provisioner.provision( + false, false, API_SPEC, "./build/resources/test", diff --git a/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java b/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java index c2198612..45e90e19 100644 --- a/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java +++ b/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java @@ -107,6 +107,7 @@ public static void provision() { final SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(KAFKA_ENV.schemeRegistryServer(), 5); Provisioner.provision( + false, false, API_SPEC, "./build/resources/test", diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java index 5a1cd6b3..973a6307 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/Provisioner.java @@ -32,6 +32,7 @@ private Provisioner() {} * Provision Topics, ACLS and schemas * * @param dryRun test or execute + * @param cleanUnspecified cleanup * @param apiSpec given spec * @param schemaResources schema path * @param adminClient kafka admin client @@ -41,6 +42,7 @@ private Provisioner() {} */ public static Status provision( final boolean dryRun, + final boolean cleanUnspecified, final KafkaApiSpec apiSpec, final String schemaResources, final Admin adminClient, @@ -49,7 +51,10 @@ public static Status provision( apiSpec.apiSpec().validate(); final var status = - Status.builder().topics(TopicProvisioner.provision(dryRun, apiSpec, adminClient)); + Status.builder() + .topics( + TopicProvisioner.provision( + dryRun, cleanUnspecified, apiSpec, adminClient)); schemaRegistryClient.ifPresent( registryClient -> status.schemas( diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/TopicChangeSetCalculators.java b/kafka/src/main/java/io/specmesh/kafka/provision/TopicChangeSetCalculators.java index 39401bae..0e8c3afa 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/TopicChangeSetCalculators.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/TopicChangeSetCalculators.java @@ -150,6 +150,22 @@ public Collection calculate( .collect(Collectors.toList()); } } + /** Returns those topics to create and ignores existing topics */ + public static final class UnspecifiedCalculator implements ChangeSetCalculator { + + /** + * Calculate changes of topics that are unspecified + * + * @param existingTopics - existing + * @param requiredTopics - set of topics that should exist + * @return list of new topics with 'CREATE' flag + */ + public Collection calculate( + final Collection existingTopics, final Collection requiredTopics) { + existingTopics.removeAll(requiredTopics); + return existingTopics; + } + } /** Main API */ interface ChangeSetCalculator { /** @@ -182,10 +198,15 @@ public static ChangeSetBuilder builder() { /** * build it * + * @param cleanUnspecified - to remove unspecified resources * @return required calculator */ - public ChangeSetCalculator build() { - return new CollectiveCalculator(new CreateCalculator(), new UpdateCalculator()); + public ChangeSetCalculator build(final boolean cleanUnspecified) { + if (cleanUnspecified) { + return new UnspecifiedCalculator(); + } else { + return new CollectiveCalculator(new CreateCalculator(), new UpdateCalculator()); + } } } } diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/TopicWriters.java b/kafka/src/main/java/io/specmesh/kafka/provision/TopicMutators.java similarity index 69% rename from kafka/src/main/java/io/specmesh/kafka/provision/TopicWriters.java rename to kafka/src/main/java/io/specmesh/kafka/provision/TopicMutators.java index 5a242575..37f7ca6d 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/TopicWriters.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/TopicMutators.java @@ -42,19 +42,19 @@ import org.apache.kafka.common.config.ConfigResource; /** Write topics using provided input set */ -public class TopicWriters { +public class TopicMutators { - /** Collection based */ - public static final class CollectiveWriter implements TopicWriter { + /** collection based */ + public static final class CollectiveMutator implements TopicMutator { - private final Stream writers; + private final Stream writers; /** * iterate over the writers * * @param writers to iterate */ - private CollectiveWriter(final TopicWriter... writers) { + private CollectiveMutator(final TopicMutator... writers) { this.writers = Arrays.stream(writers); } @@ -65,16 +65,16 @@ private CollectiveWriter(final TopicWriter... writers) { * @return updated status */ @Override - public Collection write(final Collection topics) { + public Collection mutate(final Collection topics) { return this.writers - .map(writer -> writer.write(topics)) + .map(writer -> writer.mutate(topics)) .flatMap(Collection::stream) .collect(Collectors.toList()); } } - /** only handles update requests */ - public static final class UpdateWriter implements TopicWriter { + /** updates */ + public static final class UpdateMutator implements TopicMutator { private final Admin adminClient; @@ -83,7 +83,7 @@ public static final class UpdateWriter implements TopicWriter { * * @param adminClient - cluster connection */ - UpdateWriter(final Admin adminClient) { + UpdateMutator(final Admin adminClient) { this.adminClient = adminClient; } @@ -94,7 +94,7 @@ public static final class UpdateWriter implements TopicWriter { * @return topics with updated flag * @throws ProvisioningException when things break */ - public Collection write(final Collection topics) + public Collection mutate(final Collection topics) throws ProvisioningException { final var topicsToUpdate = @@ -200,17 +200,77 @@ private void updatePartitions(final Topic topic, final TopicDescription descript } } - /** creates the topic */ - public static final class CreateWriter implements TopicWriter { + /** delete non-spec resources */ + public static final class CleanUnspecifiedMutator implements TopicMutator { + private final boolean dryRun; private final Admin adminClient; /** * Needs the admin client * + * @param dryRun - test or execute flag * @param adminClient - cluster connection */ - private CreateWriter(final Admin adminClient) { + CleanUnspecifiedMutator(final boolean dryRun, final Admin adminClient) { + this.dryRun = dryRun; + this.adminClient = adminClient; + } + + /** + * Remove topics that are not CREATE or UPSDATE (i.e. not in the spec) + * + * @param topics to write + * @return topics with updated flag + * @throws ProvisioningException when things break + */ + public Collection mutate(final Collection topics) + throws ProvisioningException { + + // spec topics will have CREATE or UPDATE status - remove the others (unwanted) + final var unwanted = + topics.stream() + .filter( + topic -> + !topic.state().equals(STATE.CREATE) + && !topic.state().equals(STATE.UPDATE)) + .collect(Collectors.toList()); + + try { + if (!dryRun) { + adminClient + .deleteTopics(toTopicNames(unwanted)) + .all() + .get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS); + } + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + throw new ProvisioningException("Failed to cleanup unwanted topics", ex); + } + return unwanted; + } + + /** + * convert to names + * + * @param topicsToUpdate source list + * @return just the names + */ + private List toTopicNames(final List topicsToUpdate) { + return topicsToUpdate.stream().map(Topic::name).collect(Collectors.toList()); + } + } + + /** creations */ + public static final class CreateMutator implements TopicMutator { + + private final Admin adminClient; + + /** + * Needs the admin client + * + * @param adminClient - cluster connection + */ + private CreateMutator(final Admin adminClient) { this.adminClient = adminClient; } @@ -221,7 +281,7 @@ private CreateWriter(final Admin adminClient) { * @return topics with updated flag * @throws ProvisioningException when things break */ - public Collection write(final Collection topics) + public Collection mutate(final Collection topics) throws ProvisioningException { final var topicsToCreate = @@ -266,8 +326,8 @@ private Collection asNewTopic(final Collection topics) { } } - /** Noop write that does nada */ - public static final class NoopWriter implements TopicWriter { + /** Noopper that does nada */ + public static final class NoopMutator implements TopicMutator { /** * Do nothing write * @@ -275,33 +335,35 @@ public static final class NoopWriter implements TopicWriter { * @return unmodified list * @throws ProvisioningException when things go wrong */ - public Collection write(final Collection topics) + public Collection mutate(final Collection topics) throws ProvisioningException { return topics; } } - /** Interface for writing topics to kafka */ - interface TopicWriter { + /** Interface for writing/mutating topics to kafka */ + interface TopicMutator { /** * Api for writing * - * @param topics to write + * @param topics to do stuff against * @return updated state of topics written */ - Collection write(Collection topics); + Collection mutate(Collection topics); } - /** TopicWriter builder */ + /** TopicMutator builder */ @SuppressFBWarnings( value = "EI_EXPOSE_REP2", justification = "adminClient() passed as param to prevent API pollution") - public static final class TopicWriterBuilder { + public static final class TopicMutatorBuilder { private Admin adminClient; - private boolean noopWriter; + private boolean noop; + private boolean cleanUnspecified; + private boolean dryRun; /** defensive */ - private TopicWriterBuilder() {} + private TopicMutatorBuilder() {} /** * add the adminClient @@ -309,19 +371,32 @@ private TopicWriterBuilder() {} * @param adminClient - cluster connection * @return builder */ - public TopicWriterBuilder adminClient(final Admin adminClient) { + public TopicMutatorBuilder adminClient(final Admin adminClient) { this.adminClient = adminClient; return this; } /** - * use a noop writer + * use a noop mutator * * @param dryRun - true is dry running * @return the builder */ - public TopicWriterBuilder noopWriter(final boolean dryRun) { - this.noopWriter = dryRun; + public TopicMutatorBuilder noopMutator(final boolean dryRun) { + this.noop = dryRun; + return this; + } + /** + * use the delete mutator + * + * @param cleanUnspecified - to cleanup resources + * @param dryRun - test.validate the proposed operation + * @return the builder + */ + public TopicMutatorBuilder cleanUnspecified( + final boolean cleanUnspecified, final boolean dryRun) { + this.cleanUnspecified = cleanUnspecified; + this.dryRun = dryRun; return this; } @@ -330,21 +405,23 @@ public TopicWriterBuilder noopWriter(final boolean dryRun) { * * @return builder */ - public static TopicWriterBuilder builder() { - return new TopicWriterBuilder(); + public static TopicMutatorBuilder builder() { + return new TopicMutatorBuilder(); } /** * build it * - * @return the specified topic writer impl + * @return the specified topic mutator impl */ - public TopicWriter build() { - if (noopWriter) { - return new NoopWriter(); + public TopicMutator build() { + if (cleanUnspecified) { + return new CleanUnspecifiedMutator(dryRun, adminClient); + } else if (noop) { + return new NoopMutator(); } else { - return new CollectiveWriter( - new CreateWriter(adminClient), new UpdateWriter(adminClient)); + return new CollectiveMutator( + new CreateMutator(adminClient), new UpdateMutator(adminClient)); } } } diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/TopicProvisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/TopicProvisioner.java index 4dff77eb..2b78db64 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/TopicProvisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/TopicProvisioner.java @@ -39,17 +39,21 @@ private TopicProvisioner() {} * Provision topics in the Kafka cluster. * * @param dryRun test or execute + * @param cleanUnspecified remove unwanted resources * @param apiSpec the api spec. * @param adminClient admin client for the Kafka cluster. * @return number of topics created * @throws Provisioner.ProvisioningException on provision failure */ public static Collection provision( - final boolean dryRun, final KafkaApiSpec apiSpec, final Admin adminClient) { + final boolean dryRun, + final boolean cleanUnspecified, + final KafkaApiSpec apiSpec, + final Admin adminClient) { final var domain = domainTopicsFromApiSpec(apiSpec); final var existing = reader(apiSpec, adminClient).readall(); - final var changeSet = comparator().calculate(existing, domain); - return writer(dryRun, adminClient).write(changeSet); + final var changeSet = comparator(cleanUnspecified).calculate(existing, domain); + return mutate(dryRun, cleanUnspecified, adminClient).mutate(changeSet); } /** @@ -57,19 +61,25 @@ public static Collection provision( * * @return comparator */ - private static TopicChangeSetCalculators.ChangeSetCalculator comparator() { - return TopicChangeSetCalculators.ChangeSetBuilder.builder().build(); + private static TopicChangeSetCalculators.ChangeSetCalculator comparator( + final boolean cleanUnspecified) { + return TopicChangeSetCalculators.ChangeSetBuilder.builder().build(cleanUnspecified); } /** * Gets a writer * - * @param dryRun to ignore writing to the cluster - * @param adminClient - cluster connection + * @param dryRun - to ignore writing to the cluster + * @param cleanupUnspecified - remove set of unspec'd resources + * @param adminClient - cluster connection * @return configured writer */ - private static TopicWriters.TopicWriter writer(final boolean dryRun, final Admin adminClient) { - final var topicWriterBuilder = TopicWriters.TopicWriterBuilder.builder().noopWriter(dryRun); + private static TopicMutators.TopicMutator mutate( + final boolean dryRun, final boolean cleanupUnspecified, final Admin adminClient) { + final var topicWriterBuilder = + TopicMutators.TopicMutatorBuilder.builder() + .noopMutator(dryRun) + .cleanUnspecified(cleanupUnspecified, dryRun); return topicWriterBuilder.adminClient(adminClient).build(); } diff --git a/kafka/src/test/java/io/specmesh/kafka/ExporterFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/ExporterFunctionalTest.java index dc5545ac..69f41298 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ExporterFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/ExporterFunctionalTest.java @@ -96,7 +96,7 @@ private enum Domain { @BeforeAll static void setUp() { try (Admin adminClient = KAFKA_ENV.adminClient()) { - TopicProvisioner.provision(false, API_SPEC, adminClient); + TopicProvisioner.provision(false, false, API_SPEC, adminClient); AclProvisioner.provision(false, API_SPEC, adminClient); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java index ab804420..3045be4f 100644 --- a/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/KafkaAPISpecFunctionalTest.java @@ -128,7 +128,7 @@ private enum Domain { @BeforeAll static void setUp() { try (Admin adminClient = KAFKA_ENV.adminClient()) { - TopicProvisioner.provision(false, API_SPEC, adminClient); + TopicProvisioner.provision(false, false, API_SPEC, adminClient); AclProvisioner.provision(false, API_SPEC, adminClient); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java index e49bb696..bc2ee462 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java @@ -113,7 +113,7 @@ private enum Domain { void shouldDryRunTopicsFromEmptyCluster() { try (Admin adminClient = KAFKA_ENV.adminClient()) { - final var changeset = TopicProvisioner.provision(true, API_SPEC, adminClient); + final var changeset = TopicProvisioner.provision(true, false, API_SPEC, adminClient); assertThat( changeset.stream().map(Topic::name).collect(Collectors.toSet()), @@ -225,7 +225,7 @@ private static CachedSchemaRegistryClient srClient() { void shouldProvisionTopicsFromEmptyCluster() throws ExecutionException, InterruptedException { try (Admin adminClient = KAFKA_ENV.adminClient()) { - final var changeSet = TopicProvisioner.provision(false, API_SPEC, adminClient); + final var changeSet = TopicProvisioner.provision(false, false, API_SPEC, adminClient); // Verify - changeset assertThat( diff --git a/kafka/src/test/java/io/specmesh/kafka/ProvisionerUpdatingFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/ProvisionerUpdatingFunctionalTest.java index f5d97aba..262327d0 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ProvisionerUpdatingFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/ProvisionerUpdatingFunctionalTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -48,8 +49,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.config.TopicConfig; @@ -111,7 +116,7 @@ private enum Domain { @Order(1) void shouldProvisionExistingSpec() { try (Admin adminClient = KAFKA_ENV.adminClient()) { - TopicProvisioner.provision(false, API_SPEC, adminClient); + TopicProvisioner.provision(false, false, API_SPEC, adminClient); AclProvisioner.provision(false, API_SPEC, adminClient); SchemaProvisioner.provision(false, API_SPEC, "./build/resources/test", srClient()); } @@ -124,18 +129,19 @@ void shouldDoTopicUpdates() { // DRY RUN Test final var dryRunChangeset = - TopicProvisioner.provision(true, API_UPDATE_SPEC, adminClient); + TopicProvisioner.provision(true, false, API_UPDATE_SPEC, adminClient); assertThat( dryRunChangeset.stream().map(Topic::name).collect(Collectors.toSet()), - is(containsInAnyOrder(USER_SIGNED_UP))); + is(hasItem(USER_SIGNED_UP))); final var dryFirstUpdate = dryRunChangeset.iterator().next(); assertThat(dryFirstUpdate.state(), is(STATE.UPDATE)); // REAL Test - final var changeset = TopicProvisioner.provision(false, API_UPDATE_SPEC, adminClient); + final var changeset = + TopicProvisioner.provision(false, false, API_UPDATE_SPEC, adminClient); final var change = changeset.iterator().next(); @@ -210,6 +216,55 @@ void shouldPublishUpdatedAcls() { } } + @Test + @Order(5) + void shouldCleanupNonSpecTopicsDryRun() + throws ExecutionException, InterruptedException, TimeoutException { + try (Admin adminClient = KAFKA_ENV.adminClient()) { + adminClient + .createTopics( + List.of(new NewTopic(API_SPEC.id() + ".should.not.be", 1, (short) 1))) + .all() + .get(20, TimeUnit.SECONDS); + + assertThat(topicCount(adminClient), is(3L)); + + // create the unspecified topic + final var unSpecifiedTopics = + TopicProvisioner.provision(true, true, API_SPEC, adminClient); + // 'should.not.be' topic that should not be + assertThat(unSpecifiedTopics, is(hasSize(1))); + assertThat(topicCount(adminClient), is(3L)); + } + } + + @Test + @Order(6) + void shouldCleanupNonSpecTopicsIRL() + throws ExecutionException, InterruptedException { + try (Admin adminClient = KAFKA_ENV.adminClient()) { + + assertThat(topicCount(adminClient), is(3L)); + + // create the unspecified topic + final var unSpecifiedTopics = + TopicProvisioner.provision(false, true, API_SPEC, adminClient); + + // 'should.not.be' topic that should not be + assertThat(unSpecifiedTopics, is(hasSize(1))); + + // 'should.not.be' topic was removed + assertThat(topicCount(adminClient), is(2L)); + } + } + + private static long topicCount(final Admin adminClient) + throws InterruptedException, ExecutionException { + return adminClient.listTopics().listings().get().stream() + .filter(t -> t.name().startsWith(API_SPEC.id())) + .count(); + } + private static CachedSchemaRegistryClient srClient() { return new CachedSchemaRegistryClient( KAFKA_ENV.schemeRegistryServer(), diff --git a/kafka/src/test/java/io/specmesh/kafka/admin/SimpleAdminClientTest.java b/kafka/src/test/java/io/specmesh/kafka/admin/SimpleAdminClientTest.java index 2e784cf8..f7951e38 100644 --- a/kafka/src/test/java/io/specmesh/kafka/admin/SimpleAdminClientTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/admin/SimpleAdminClientTest.java @@ -78,6 +78,7 @@ void shouldRecordStats() throws ExecutionException, InterruptedException, Timeou try (Admin adminClient = KAFKA_ENV.adminClient()) { final var client = SmAdminClient.create(adminClient); Provisioner.provision( + false, false, API_SPEC, "./build/resources/test", diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/TopicWritersTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/TopicMutatorsTest.java similarity index 93% rename from kafka/src/test/java/io/specmesh/kafka/provision/TopicWritersTest.java rename to kafka/src/test/java/io/specmesh/kafka/provision/TopicMutatorsTest.java index 56ee6ace..07f3edc9 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/TopicWritersTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/TopicMutatorsTest.java @@ -23,8 +23,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.specmesh.kafka.provision.TopicMutators.UpdateMutator; import io.specmesh.kafka.provision.TopicProvisioner.Topic; -import io.specmesh.kafka.provision.TopicWriters.UpdateWriter; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -41,14 +41,15 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +/** Fussy mutation tests */ @ExtendWith(MockitoExtension.class) -class TopicWritersTest { +class TopicMutatorsTest { @Mock Admin client; @Test void shouldWriteUpdatesForRetentionChange() throws Exception { - final var topicWriter = new UpdateWriter(client); + final var topicWriter = new UpdateMutator(client); // Mock hell - crappy admin api final var topicDescriptionFuture = mock(KafkaFuture.class); @@ -71,7 +72,7 @@ void shouldWriteUpdatesForRetentionChange() throws Exception { .partitions(1) .config(Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000")) .build()); - final var updated = topicWriter.write(updates); + final var updated = topicWriter.mutate(updates); final var next = updated.iterator().next(); // verify @@ -82,7 +83,7 @@ void shouldWriteUpdatesForRetentionChange() throws Exception { @Test void shouldWriteUpdatesToPartitionsWhenLarger() throws Exception { - final var topicWriter = new UpdateWriter(client); + final var topicWriter = new UpdateMutator(client); // Mock hell - crappy admin api final var topicDescriptionFuture = mock(KafkaFuture.class); @@ -111,7 +112,7 @@ void shouldWriteUpdatesToPartitionsWhenLarger() throws Exception { .state(Status.STATE.UPDATE) .partitions(999) .build()); - final var updated = topicWriter.write(updates); + final var updated = topicWriter.mutate(updates); final var next = updated.iterator().next(); // verify