Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate StrimziPodSets before rolling during CA renewal #10711

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* after a scaling up, the operator triggers an auto-rebalancing for moving some of the existing partitions to the newly added brokers.
* before scaling down, and if the brokers to remove are hosting partitions, the operator triggers an auto-rebalancing to these partitions off the brokers to make them free to be removed.
* Strimzi Access Operator 0.1.0 added to the installation files and examples
* Patch the StrimziPodSets before rolling the pods during CA certificate renewal.
katheris marked this conversation as resolved.
Show resolved Hide resolved

### Changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,36 @@ public static StrimziPodSet createPodSet(
.build();
}

/**
* Patch a Strimzi PodSet to merge the provided annotations with the annotations on the Pod resources defined
* in the PodSet
*
* @param strimziPodSet Strimzi PodSet to patch
* @param annotations Annotations to merge with the existing annotations
*
* @return Patched PodSet
*/
public static StrimziPodSet patchAnnotations(StrimziPodSet strimziPodSet, Map<String, String> annotations) {
katheris marked this conversation as resolved.
Show resolved Hide resolved
List<Map<String, Object>> newPods = PodSetUtils.podSetToPods(strimziPodSet)
.stream()
.map(pod -> {
Map<String, String> updatedAnnotations = pod.getMetadata().getAnnotations();
updatedAnnotations.putAll(annotations);
return pod.edit()
.editMetadata()
.withAnnotations(updatedAnnotations)
katheris marked this conversation as resolved.
Show resolved Hide resolved
.endMetadata()
.build();
})
.map(PodSetUtils::podToMap)
.toList();
return new StrimziPodSetBuilder(strimziPodSet)
.editSpec()
.withPods(newPods)
.endSpec()
.build();
}

/**
* Creates a stateful Pod for use with StrimziPodSets. Stateful in this context means that it has a stable name and
* typically uses storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
import io.strimzi.operator.cluster.model.WorkloadUtils;
import io.strimzi.operator.cluster.operator.resource.KafkaAgentClientProvider;
import io.strimzi.operator.cluster.operator.resource.KafkaRoller;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -395,8 +396,8 @@ Future<Void> rollingUpdateForNewCaKey() {
TlsPemIdentity coTlsPemIdentity = new TlsPemIdentity(new PemTrustSet(clusterCa.caCertSecret()), PemAuthIdentity.clusterOperator(coSecret));
return getZooKeeperReplicas()
.compose(replicas -> maybeRollZookeeper(replicas, podRollReasons, coTlsPemIdentity))
.compose(i -> getKafkaReplicas())
.compose(nodes -> rollKafkaBrokers(nodes, podRollReasons, coTlsPemIdentity))
.compose(i -> updateKafkaPodCertAnnotations())
.compose(podSets -> rollKafka(podSets, podRollReasons, coTlsPemIdentity))
.compose(i -> maybeRollDeploymentIfExists(KafkaResources.entityOperatorDeploymentName(reconciliation.name()), podRollReasons))
.compose(i -> maybeRollDeploymentIfExists(KafkaExporterResources.componentName(reconciliation.name()), podRollReasons))
.compose(i -> maybeRollDeploymentIfExists(CruiseControlResources.componentName(reconciliation.name()), podRollReasons));
Expand Down Expand Up @@ -527,27 +528,42 @@ Future<Void> rollingUpdateForNewCaKey() {
}
}

/* test */ Future<Set<NodeRef>> getKafkaReplicas() {
/* test */ Future<List<StrimziPodSet>> updateKafkaPodCertAnnotations() {
katheris marked this conversation as resolved.
Show resolved Hide resolved
Labels selectorLabels = Labels.EMPTY
.withStrimziKind(reconciliation.kind())
.withStrimziCluster(reconciliation.name())
.withStrimziName(KafkaResources.kafkaComponentName(reconciliation.name()));

return strimziPodSetOperator.listAsync(reconciliation.namespace(), selectorLabels)
.compose(podSets -> {
Set<NodeRef> nodes = new LinkedHashSet<>();

if (podSets != null) {
for (StrimziPodSet podSet : podSets) {
nodes.addAll(ReconcilerUtils.nodesFromPodSet(podSet));
}
List<StrimziPodSet> updatedPodSets = podSets
.stream()
.map(podSet -> WorkloadUtils.patchAnnotations(
podSet,
Map.of(
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(clusterCa.caCertGeneration()),
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(clusterCa.caKeyGeneration()),
Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, String.valueOf(clientsCa.caCertGeneration())
)))
.toList();
return strimziPodSetOperator.batchReconcile(
reconciliation,
reconciliation.namespace(),
updatedPodSets,
selectorLabels
).map(i -> updatedPodSets);
katheris marked this conversation as resolved.
Show resolved Hide resolved
} else {
return Future.succeededFuture(new ArrayList<>());
}

return Future.succeededFuture(nodes);
});

}

/* test */ Future<Void> rollKafkaBrokers(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
/* test */ Future<Void> rollKafka(List<StrimziPodSet> podSets, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
Set<NodeRef> nodes = new LinkedHashSet<>();
for (StrimziPodSet podSet : podSets) {
nodes.addAll(ReconcilerUtils.nodesFromPodSet(podSet));
}
return new KafkaRoller(
reconciliation,
vertx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.api.kafka.model.podset.StrimziPodSet;
import io.strimzi.api.kafka.model.podset.StrimziPodSetBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -363,6 +364,44 @@ public void testCreateStrimziPodSetFromNodeReferencesWithTemplate() {
assertThat(sps.getSpec().getPods().stream().map(pod -> PodSetUtils.mapToPod(pod).getMetadata().getName()).toList(), hasItems("my-cluster-nodes-10", "my-cluster-nodes-11", "my-cluster-nodes-12"));
}

@Test
public void testPatchPodAnnotations() {
Map<String, String> annotations = Map.of("anno-1", "value-1", "anno-2", "value-2", "anno-3", "value-3");
List<Pod> pods = new ArrayList<>();
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-0")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-1")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);

StrimziPodSet sps = new StrimziPodSetBuilder()
.withNewMetadata()
.withName("my-sps")
.withNamespace(NAMESPACE)
.endMetadata()
.withNewSpec()
.withPods(PodSetUtils.podsToMaps(pods))
.endSpec()
.build();

List<Pod> resultPods = PodSetUtils.podSetToPods(WorkloadUtils.patchAnnotations(sps, Map.of("anno-2", "value-2a", "anno-4", "value-4")));
assertThat(resultPods.size(), is(2));
Map<String, String> expectedAnnotations = Map.of("anno-1", "value-1", "anno-2", "value-2a", "anno-3", "value-3", "anno-4", "value-4");
assertThat(resultPods.get(0).getMetadata().getAnnotations(), is(expectedAnnotations));
assertThat(resultPods.get(1).getMetadata().getAnnotations(), is(expectedAnnotations));
}

//////////////////////////////////////////////////
// Stateful Pod tests
//////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
import io.strimzi.api.kafka.model.podset.StrimziPodSet;
import io.strimzi.api.kafka.model.podset.StrimziPodSetBuilder;
import io.strimzi.certs.CertAndKey;
import io.strimzi.certs.CertManager;
import io.strimzi.certs.OpenSslCertManager;
Expand All @@ -28,7 +30,7 @@
import io.strimzi.operator.cluster.ResourceUtils;
import io.strimzi.operator.cluster.model.AbstractModel;
import io.strimzi.operator.cluster.model.ModelUtils;
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.PodSetUtils;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -76,7 +78,6 @@
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -1529,22 +1530,33 @@ public void testRollingReasonsWithClusterCAKeyNotTrusted(Vertx vertx, VertxTestC
StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator;
when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture());

List<Pod> pods = new ArrayList<>();
// adding a terminating Cruise Control pod to test that it's skipped during the key generation check
Pod ccPod = podWithNameAndAnnotations("my-cluster-cruise-control", false, false, generationAnnotations);
ccPod.getMetadata().setDeletionTimestamp("2023-06-08T16:23:18Z");
pods.add(ccPod);
// adding Kafka pods with old CA cert and key generation
pods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations));

StrimziPodSet strimziPodSet = new StrimziPodSetBuilder()
.withNewMetadata()
.withName(NAME)
.endMetadata()
.withNewSpec()
.withPods(PodSetUtils.podsToMaps(pods))
.endSpec()
.build();
katheris marked this conversation as resolved.
Show resolved Hide resolved

PodOperator mockPodOps = supplier.podOperations;
when(mockPodOps.listAsync(any(), any(Labels.class))).thenAnswer(i -> {
List<Pod> pods = new ArrayList<>();
// adding a terminating Cruise Control pod to test that it's skipped during the key generation check
Pod ccPod = podWithNameAndAnnotations("my-cluster-cruise-control", false, false, generationAnnotations);
ccPod.getMetadata().setDeletionTimestamp("2023-06-08T16:23:18Z");
pods.add(ccPod);
// adding Kafka pods with old CA cert and key generation
pods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations));
pods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations));
return Future.succeededFuture(pods);
});
when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods));

when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(strimziPodSet)));
when(spsOps.batchReconcile(any(), eq(NAMESPACE), any(), any(Labels.class))).thenReturn(Future.succeededFuture());

Map<String, Deployment> deps = new HashMap<>();
deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator"));
Expand All @@ -1560,6 +1572,14 @@ public void testRollingReasonsWithClusterCAKeyNotTrusted(Vertx vertx, VertxTestC
mockCaReconciler
.reconcile(Clock.systemUTC())
.onComplete(context.succeeding(c -> context.verify(() -> {
List<StrimziPodSet> podSets = mockCaReconciler.podSets;
assertThat(podSets, hasSize(1));
List<Pod> returnedPods = PodSetUtils.podSetToPods(podSets.get(0));
for (Pod pod : returnedPods) {
Map<String, String> podAnnotations = pod.getMetadata().getAnnotations();
assertThat(podAnnotations, hasEntry(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "1"));
assertThat(podAnnotations, hasEntry(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "1"));
}
assertThat(mockCaReconciler.isClusterCaNeedFullTrust, is(true));
assertThat(mockCaReconciler.kPodRollReasons.contains(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED), is(true));
assertThat(mockCaReconciler.deploymentRollReason.size() == 3, is(true));
Expand All @@ -1571,6 +1591,7 @@ public void testRollingReasonsWithClusterCAKeyNotTrusted(Vertx vertx, VertxTestC
}

static class MockCaReconciler extends CaReconciler {
List<StrimziPodSet> podSets;
RestartReasons kPodRollReasons;
List<String> deploymentRollReason = new ArrayList<>();

Expand All @@ -1587,19 +1608,8 @@ Future<Void> verifyClusterCaFullyTrustedAndUsed() {
}

@Override
Future<Set<NodeRef>> getKafkaReplicas() {
Set<NodeRef> nodes = new HashSet<>();
nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-brokers-0", true, false)));
nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-brokers-1", true, false)));
nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-brokers-2", true, false)));
nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-controllers-3", true, false)));
nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-controllers-4", true, false)));
nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-controllers-5", true, false)));
return Future.succeededFuture(nodes);
}

@Override
Future<Void> rollKafkaBrokers(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
Future<Void> rollKafka(List<StrimziPodSet> podSets, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
this.podSets = podSets;
this.kPodRollReasons = podRollReasons;
return Future.succeededFuture();
}
Expand All @@ -1621,11 +1631,7 @@ Future<Void> maybeRemoveOldClusterCaCertificates() {
}
}

public static Pod podWithName(String name, boolean broker, boolean controller) {
return podWithNameAndAnnotations(name, broker, controller, Map.of());
}

public static Pod podWithNameAndAnnotations(String name, boolean broker, boolean controller, Map<String, String> annotations) {
private static Pod podWithNameAndAnnotations(String name, boolean broker, boolean controller, Map<String, String> annotations) {
return new PodBuilder()
.withNewMetadata()
.withName(name)
Expand All @@ -1639,7 +1645,7 @@ public static Pod podWithNameAndAnnotations(String name, boolean broker, boolean
.build();
}

public static Deployment deploymentWithName(String name) {
private static Deployment deploymentWithName(String name) {
return new DeploymentBuilder()
.withNewMetadata()
.withName(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaBuilder;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.podset.StrimziPodSet;
import io.strimzi.certs.CertManager;
import io.strimzi.certs.OpenSslCertManager;
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.KafkaVersionTestUtils;
import io.strimzi.operator.cluster.ResourceUtils;
import io.strimzi.operator.cluster.model.AbstractModel;
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -47,10 +47,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -265,16 +263,13 @@ Future<Void> maybeRollZookeeper(int replicas, RestartReasons podRestartReasons,
}

@Override
Future<Set<NodeRef>> getKafkaReplicas() {
Set<NodeRef> nodes = new HashSet<>();
nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-kafka-kafka-0")));
nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-kafka-kafka-1")));
nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-kafka-kafka-2")));
return Future.succeededFuture(nodes);
Future<List<StrimziPodSet>> updateKafkaPodCertAnnotations() {
//Response is ignored by rollKafka method in the mock
return Future.succeededFuture(List.of());
}

@Override
Future<Void> rollKafkaBrokers(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
Future<Void> rollKafka(List<StrimziPodSet> podSets, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
this.kPodRollReasons = podRollReasons;
return Future.succeededFuture();
}
Expand Down
Loading