Skip to content

Commit

Permalink
Part 1 of #58 - fixing state mgmt
Browse files Browse the repository at this point in the history
  • Loading branch information
bluemonk3y committed Oct 2, 2023
1 parent ee689ca commit 5763b8f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,21 @@ public Collection<Topic> mutate(final Collection<Topic> topics)
.collect(Collectors.toList());

try {
unwanted.forEach(topic -> topic.state(STATE.DELETE));
if (!dryRun) {
adminClient
.deleteTopics(toTopicNames(unwanted))
.all()
.get(Provisioner.REQUEST_TIMEOUT, TimeUnit.SECONDS);
unwanted.forEach(topic -> topic.state(STATE.DELETED));
}
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
throw new ProvisioningException("Failed to cleanup unwanted topics", ex);
unwanted.forEach(
topic ->
topic.exception(
new ProvisioningException(
"failed to delete topics", ex))
.state(STATE.FAILED));
}
return unwanted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ void shouldCleanupNonSpecTopicsDryRun()
TopicProvisioner.provision(true, true, API_SPEC, adminClient);
// 'should.not.be' topic that should not be
assertThat(unSpecifiedTopics, is(hasSize(1)));
assertThat(unSpecifiedTopics.iterator().next().state(), is(STATE.DELETE));
assertThat(topicCount(adminClient), is(3L));
}
}
Expand All @@ -251,6 +252,7 @@ void shouldCleanupNonSpecTopicsIRL() throws ExecutionException, InterruptedExcep

// 'should.not.be' topic that should not be
assertThat(unSpecifiedTopics, is(hasSize(1)));
assertThat(unSpecifiedTopics.iterator().next().state(), is(STATE.DELETED));

// 'should.not.be' topic was removed
assertThat(topicCount(adminClient), is(2L));
Expand Down

0 comments on commit 5763b8f

Please sign in to comment.