diff --git a/CHANGELOG.md b/CHANGELOG.md index b3ba24b7c6..f212489f46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * after a scaling up, the operator triggers an auto-rebalancing for moving some of the existing partitions to the newly added brokers. * before scaling down, and if the brokers to remove are hosting partitions, the operator triggers an auto-rebalancing to these partitions off the brokers to make them free to be removed. * Strimzi Access Operator 0.1.0 added to the installation files and examples +* Allow rolling update for new cluster CA trust (during Cluster CA key replacement) to continue where it left off before interruption without rolling all pods again. ### Changes, deprecations and removals diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/WorkloadUtils.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/WorkloadUtils.java index d460dc7226..4f2997960b 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/WorkloadUtils.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/WorkloadUtils.java @@ -183,6 +183,36 @@ public static StrimziPodSet createPodSet( .build(); } + /** + * Patch a Strimzi PodSet to merge the provided annotations with the annotations on the Pod resources defined + * in the PodSet + * + * @param strimziPodSet Strimzi PodSet to patch + * @param annotationsToBeUpdated Annotations to merge with the existing annotations + * + * @return Patched PodSet + */ + public static StrimziPodSet patchAnnotations(StrimziPodSet strimziPodSet, Map annotationsToBeUpdated) { + List> newPods = PodSetUtils.podSetToPods(strimziPodSet) + .stream() + .map(pod -> { + Map updatedAnnotations = pod.getMetadata().getAnnotations(); + updatedAnnotations.putAll(annotationsToBeUpdated); + return pod.edit() + .editMetadata() + .withAnnotations(updatedAnnotations) + .endMetadata() + .build(); + }) + .map(PodSetUtils::podToMap) + .toList(); + return new StrimziPodSetBuilder(strimziPodSet) + .editSpec() + .withPods(newPods) + .endSpec() + .build(); + } + /** * Creates a stateful Pod for use with StrimziPodSets. Stateful in this context means that it has a stable name and * typically uses storage. diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java index 3a6288465e..d0cf3dfdf5 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java @@ -24,6 +24,7 @@ import io.strimzi.operator.cluster.model.NodeRef; import io.strimzi.operator.cluster.model.RestartReason; import io.strimzi.operator.cluster.model.RestartReasons; +import io.strimzi.operator.cluster.model.WorkloadUtils; import io.strimzi.operator.cluster.operator.resource.KafkaAgentClientProvider; import io.strimzi.operator.cluster.operator.resource.KafkaRoller; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; @@ -55,11 +56,11 @@ import java.time.Clock; import java.util.ArrayList; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; /** * Class used for reconciliation of Cluster and Client CAs. This class contains both the steps of the CA reconciliation @@ -395,8 +396,8 @@ Future rollingUpdateForNewCaKey() { TlsPemIdentity coTlsPemIdentity = new TlsPemIdentity(new PemTrustSet(clusterCa.caCertSecret()), PemAuthIdentity.clusterOperator(coSecret)); return getZooKeeperReplicas() .compose(replicas -> maybeRollZookeeper(replicas, podRollReasons, coTlsPemIdentity)) - .compose(i -> getKafkaReplicas()) - .compose(nodes -> rollKafkaBrokers(nodes, podRollReasons, coTlsPemIdentity)) + .compose(i -> patchCaGenerationAndReturnNodes()) + .compose(nodes -> rollKafka(nodes, podRollReasons, coTlsPemIdentity)) .compose(i -> maybeRollDeploymentIfExists(KafkaResources.entityOperatorDeploymentName(reconciliation.name()), podRollReasons)) .compose(i -> maybeRollDeploymentIfExists(KafkaExporterResources.componentName(reconciliation.name()), podRollReasons)) .compose(i -> maybeRollDeploymentIfExists(CruiseControlResources.componentName(reconciliation.name()), podRollReasons)); @@ -527,27 +528,40 @@ Future rollingUpdateForNewCaKey() { } } - /* test */ Future> getKafkaReplicas() { + /* test */ Future> patchCaGenerationAndReturnNodes() { Labels selectorLabels = Labels.EMPTY .withStrimziKind(reconciliation.kind()) .withStrimziCluster(reconciliation.name()) .withStrimziName(KafkaResources.kafkaComponentName(reconciliation.name())); - return strimziPodSetOperator.listAsync(reconciliation.namespace(), selectorLabels) .compose(podSets -> { - Set nodes = new LinkedHashSet<>(); - if (podSets != null) { - for (StrimziPodSet podSet : podSets) { - nodes.addAll(ReconcilerUtils.nodesFromPodSet(podSet)); - } + List updatedPodSets = podSets + .stream() + .map(podSet -> WorkloadUtils.patchAnnotations( + podSet, + Map.of( + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(clusterCa.caCertGeneration()), + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(clusterCa.caKeyGeneration()), + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, String.valueOf(clientsCa.caCertGeneration()) + ))) + .toList(); + return strimziPodSetOperator.batchReconcile( + reconciliation, + reconciliation.namespace(), + updatedPodSets, + selectorLabels + ).map(i -> updatedPodSets.stream() + .flatMap(podSet -> ReconcilerUtils.nodesFromPodSet(podSet).stream()) + .collect(Collectors.toSet())); + } else { + return Future.succeededFuture(Set.of()); } - - return Future.succeededFuture(nodes); }); + } - /* test */ Future rollKafkaBrokers(Set nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) { + /* test */ Future rollKafka(Set nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) { return new KafkaRoller( reconciliation, vertx, diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/WorkloadUtilsTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/WorkloadUtilsTest.java index 369a7a437d..24cf20c80a 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/WorkloadUtilsTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/WorkloadUtilsTest.java @@ -36,6 +36,7 @@ import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.kafka.Storage; import io.strimzi.api.kafka.model.podset.StrimziPodSet; +import io.strimzi.api.kafka.model.podset.StrimziPodSetBuilder; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.Labels; import org.junit.jupiter.api.Test; @@ -363,6 +364,44 @@ public void testCreateStrimziPodSetFromNodeReferencesWithTemplate() { assertThat(sps.getSpec().getPods().stream().map(pod -> PodSetUtils.mapToPod(pod).getMetadata().getName()).toList(), hasItems("my-cluster-nodes-10", "my-cluster-nodes-11", "my-cluster-nodes-12")); } + @Test + public void testPatchPodAnnotations() { + Map annotations = Map.of("anno-1", "value-1", "anno-2", "value-2", "anno-3", "value-3"); + List pods = new ArrayList<>(); + pods.add(new PodBuilder() + .withNewMetadata() + .withName("pod-0") + .withNamespace(NAMESPACE) + .withAnnotations(annotations) + .endMetadata() + .build() + ); + pods.add(new PodBuilder() + .withNewMetadata() + .withName("pod-1") + .withNamespace(NAMESPACE) + .withAnnotations(annotations) + .endMetadata() + .build() + ); + + StrimziPodSet sps = new StrimziPodSetBuilder() + .withNewMetadata() + .withName("my-sps") + .withNamespace(NAMESPACE) + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(pods)) + .endSpec() + .build(); + + List resultPods = PodSetUtils.podSetToPods(WorkloadUtils.patchAnnotations(sps, Map.of("anno-2", "value-2a", "anno-4", "value-4"))); + assertThat(resultPods.size(), is(2)); + Map expectedAnnotations = Map.of("anno-1", "value-1", "anno-2", "value-2a", "anno-3", "value-3", "anno-4", "value-4"); + assertThat(resultPods.get(0).getMetadata().getAnnotations(), is(expectedAnnotations)); + assertThat(resultPods.get(1).getMetadata().getAnnotations(), is(expectedAnnotations)); + } + ////////////////////////////////////////////////// // Stateful Pod tests ////////////////////////////////////////////////// diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerTest.java index 7331b7ddda..39f7745d4e 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerTest.java @@ -19,6 +19,8 @@ import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; +import io.strimzi.api.kafka.model.podset.StrimziPodSet; +import io.strimzi.api.kafka.model.podset.StrimziPodSetBuilder; import io.strimzi.certs.CertAndKey; import io.strimzi.certs.CertManager; import io.strimzi.certs.OpenSslCertManager; @@ -29,6 +31,7 @@ import io.strimzi.operator.cluster.model.AbstractModel; import io.strimzi.operator.cluster.model.ModelUtils; import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.cluster.model.PodSetUtils; import io.strimzi.operator.cluster.model.RestartReason; import io.strimzi.operator.cluster.model.RestartReasons; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; @@ -76,7 +79,6 @@ import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -1529,22 +1531,48 @@ public void testRollingReasonsWithClusterCAKeyNotTrusted(Vertx vertx, VertxTestC StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + List pods = new ArrayList<>(); + // adding a terminating Cruise Control pod to test that it's skipped during the key generation check + Pod ccPod = podWithNameAndAnnotations("my-cluster-cruise-control", false, false, generationAnnotations); + ccPod.getMetadata().setDeletionTimestamp("2023-06-08T16:23:18Z"); + pods.add(ccPod); + + // adding Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + pods.addAll(controllerPods); + + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + pods.addAll(brokerPods); + + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + PodOperator mockPodOps = supplier.podOperations; - when(mockPodOps.listAsync(any(), any(Labels.class))).thenAnswer(i -> { - List pods = new ArrayList<>(); - // adding a terminating Cruise Control pod to test that it's skipped during the key generation check - Pod ccPod = podWithNameAndAnnotations("my-cluster-cruise-control", false, false, generationAnnotations); - ccPod.getMetadata().setDeletionTimestamp("2023-06-08T16:23:18Z"); - pods.add(ccPod); - // adding Kafka pods with old CA cert and key generation - pods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); - return Future.succeededFuture(pods); - }); + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + when(spsOps.batchReconcile(any(), eq(NAMESPACE), any(), any(Labels.class))).thenReturn(Future.succeededFuture()); Map deps = new HashMap<>(); deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); @@ -1570,6 +1598,114 @@ public void testRollingReasonsWithClusterCAKeyNotTrusted(Vertx vertx, VertxTestC }))); } + @Test + public void testCertAnnotationsPatchedWithClusterCAKeyNotTrusted(Vertx vertx, VertxTestContext context) { + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .withNewEntityOperator() + .endEntityOperator() + .withNewCruiseControl() + .endCruiseControl() + .withNewKafkaExporter() + .endKafkaExporter() + .endSpec() + .build(); + + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + ArgumentCaptor clusterCaCert = ArgumentCaptor.forClass(Secret.class); + ArgumentCaptor clusterCaKey = ArgumentCaptor.forClass(Secret.class); + when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaCertSecretName(NAME)), clusterCaCert.capture())).thenAnswer(i -> { + Secret s = clusterCaCert.getValue(); + s.getMetadata().setAnnotations(Map.of(Ca.ANNO_STRIMZI_IO_CA_CERT_GENERATION, "1")); + return Future.succeededFuture(ReconcileResult.created(s)); + }); + when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaKeySecretName(NAME)), clusterCaKey.capture())).thenAnswer(i -> { + Secret s = clusterCaKey.getValue(); + s.getMetadata().setAnnotations(Map.of(Ca.ANNO_STRIMZI_IO_CA_KEY_GENERATION, "1")); + return Future.succeededFuture(ReconcileResult.created(s)); + }); + when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaCertificateSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0)))); + when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaKeySecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0)))); + when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0)))); + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); + + Map generationAnnotations = + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0"); + + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + + List pods = new ArrayList<>(); + + // adding Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + pods.addAll(controllerPods); + + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + pods.addAll(brokerPods); + + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenReturn(Future.succeededFuture(null)); + + Checkpoint async = context.checkpoint(2); + + when(spsOps.batchReconcile(any(), eq(NAMESPACE), any(), any(Labels.class))).thenAnswer(i -> { + List podSets = i.getArgument(2); + context.verify(() -> { + assertThat(podSets, hasSize(2)); + List returnedPods = podSets + .stream() + .flatMap(podSet -> PodSetUtils.podSetToPods(podSet).stream()) + .toList(); + for (Pod pod : returnedPods) { + Map podAnnotations = pod.getMetadata().getAnnotations(); + assertThat(podAnnotations, hasEntry(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "1")); + assertThat(podAnnotations, hasEntry(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "1")); + } + }); + async.flag(); + return Future.succeededFuture(); + }); + + MockCaReconciler mockCaReconciler = new MockCaReconciler(reconciliation, kafka, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); + mockCaReconciler + .reconcile(Clock.systemUTC()) + .onComplete(context.succeeding(c -> async.flag())); + } + static class MockCaReconciler extends CaReconciler { RestartReasons kPodRollReasons; List deploymentRollReason = new ArrayList<>(); @@ -1587,19 +1723,7 @@ Future verifyClusterCaFullyTrustedAndUsed() { } @Override - Future> getKafkaReplicas() { - Set nodes = new HashSet<>(); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-brokers-0", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-brokers-1", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-brokers-2", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-controllers-3", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-controllers-4", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-controllers-5", true, false))); - return Future.succeededFuture(nodes); - } - - @Override - Future rollKafkaBrokers(Set nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) { + Future rollKafka(Set nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) { this.kPodRollReasons = podRollReasons; return Future.succeededFuture(); } @@ -1621,11 +1745,7 @@ Future maybeRemoveOldClusterCaCertificates() { } } - public static Pod podWithName(String name, boolean broker, boolean controller) { - return podWithNameAndAnnotations(name, broker, controller, Map.of()); - } - - public static Pod podWithNameAndAnnotations(String name, boolean broker, boolean controller, Map annotations) { + private static Pod podWithNameAndAnnotations(String name, boolean broker, boolean controller, Map annotations) { return new PodBuilder() .withNewMetadata() .withName(name) @@ -1639,7 +1759,7 @@ public static Pod podWithNameAndAnnotations(String name, boolean broker, boolean .build(); } - public static Deployment deploymentWithName(String name) { + private static Deployment deploymentWithName(String name) { return new DeploymentBuilder() .withNewMetadata() .withName(name) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerZooBasedTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerZooBasedTest.java index 44de51b139..a41182b5d3 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerZooBasedTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerZooBasedTest.java @@ -47,7 +47,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -265,16 +264,13 @@ Future maybeRollZookeeper(int replicas, RestartReasons podRestartReasons, } @Override - Future> getKafkaReplicas() { - Set nodes = new HashSet<>(); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-kafka-kafka-0"))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-kafka-kafka-1"))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-kafka-kafka-2"))); - return Future.succeededFuture(nodes); + Future> patchCaGenerationAndReturnNodes() { + //Response is ignored by rollKafka method in the mock + return Future.succeededFuture(Set.of()); } @Override - Future rollKafkaBrokers(Set nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) { + Future rollKafka(Set nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) { this.kPodRollReasons = podRollReasons; return Future.succeededFuture(); }