diff --git a/api/v1/cluster_types.go b/api/v1/cluster_types.go index 2eb7b6c0a8..d7987be5af 100644 --- a/api/v1/cluster_types.go +++ b/api/v1/cluster_types.go @@ -1084,8 +1084,15 @@ type PgBouncerIntegrationStatus struct { // ReplicaClusterConfiguration encapsulates the configuration of a replica // cluster -// +kubebuilder:validation:XValidation:rule="!has(self.promotionToken) || size(self.promotionToken) == 0 || !self.enabled",message=Promotion token must be empty on replica clusters type ReplicaClusterConfiguration struct { + // Self defines the name of this cluster. It is used to determine if this is a primary + // or a replica cluster, comparing it with `primary` + Self string `json:"self,omitempty"` + + // Primary defines which Cluster is defined to be the primary in the distributed PostgreSQL cluster, based on the + // topology specified in externalClusters + Primary string `json:"primary,omitempty"` + // The name of the external cluster which is the replication origin // +kubebuilder:validation:MinLength=1 Source string `json:"source"` @@ -1094,7 +1101,7 @@ type ReplicaClusterConfiguration struct { // existing cluster. Replica cluster can be created from a recovery // object store or via streaming through pg_basebackup. // Refer to the Replica clusters page of the documentation for more information. - Enabled bool `json:"enabled"` + Enabled *bool `json:"enabled,omitempty"` // A demotion token generated by an external cluster used to // check if the promotion requirements are met. @@ -3243,7 +3250,30 @@ func (cluster Cluster) ExternalCluster(name string) (ExternalCluster, bool) { // IsReplica checks if this is a replica cluster or not func (cluster Cluster) IsReplica() bool { - return cluster.Spec.ReplicaCluster != nil && cluster.Spec.ReplicaCluster.Enabled + // Before introducing the "primary" field, the + // "enabled" parameter was declared as a "boolean" + // and was not declared "omitempty". + // + // Legacy replica clusters will have the "replica" stanza + // and the "enabled" field set explicitly to true. + // + // The following code is designed to not change the + // previous semantics. + r := cluster.Spec.ReplicaCluster + if r == nil { + return false + } + + if r.Enabled != nil { + return *r.Enabled + } + + clusterName := r.Self + if len(clusterName) == 0 { + clusterName = cluster.Name + } + + return clusterName != r.Primary } var slotNameNegativeRegex = regexp.MustCompile("[^a-z0-9_]+") diff --git a/api/v1/cluster_types_test.go b/api/v1/cluster_types_test.go index b56f863187..c648e56aa6 100644 --- a/api/v1/cluster_types_test.go +++ b/api/v1/cluster_types_test.go @@ -788,7 +788,7 @@ var _ = Describe("Barman Endpoint CA for replica cluster", func() { Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ Source: "testSource", - Enabled: true, + Enabled: ptr.To(true), }, }, } @@ -817,7 +817,7 @@ var _ = Describe("Barman Endpoint CA for replica cluster", func() { }, ReplicaCluster: &ReplicaClusterConfiguration{ Source: "testReplica", - Enabled: true, + Enabled: ptr.To(true), }, }, } @@ -1008,7 +1008,7 @@ var _ = Describe("Cluster ShouldRecoveryCreateApplicationDatabase", func() { }) It("should return false if the cluster is a replica", func() { - cluster.Spec.ReplicaCluster = &ReplicaClusterConfiguration{Enabled: true} + cluster.Spec.ReplicaCluster = &ReplicaClusterConfiguration{Enabled: ptr.To(true)} result := cluster.ShouldRecoveryCreateApplicationDatabase() Expect(result).To(BeFalse()) }) @@ -1238,7 +1238,7 @@ var _ = Describe("ShouldPromoteFromReplicaCluster", func() { cluster := &Cluster{ Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), PromotionToken: "ABC", }, }, @@ -1250,7 +1250,7 @@ var _ = Describe("ShouldPromoteFromReplicaCluster", func() { cluster := &Cluster{ Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), }, }, } @@ -1270,7 +1270,7 @@ var _ = Describe("ShouldPromoteFromReplicaCluster", func() { cluster := &Cluster{ Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), PromotionToken: "ABC", }, }, @@ -1285,7 +1285,7 @@ var _ = Describe("ShouldPromoteFromReplicaCluster", func() { cluster := &Cluster{ Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), PromotionToken: "ABC", }, }, @@ -1296,3 +1296,125 @@ var _ = Describe("ShouldPromoteFromReplicaCluster", func() { Expect(cluster.ShouldPromoteFromReplicaCluster()).To(BeTrue()) }) }) + +var _ = Describe("IsReplica", func() { + Describe("using the legacy API", func() { + replicaClusterOldAPI := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Enabled: ptr.To(true), + Source: "source-cluster", + }, + }, + } + + primaryClusterOldAPI := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: nil, + }, + } + + primaryClusterOldAPIExplicit := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Enabled: ptr.To(false), + Source: "source-cluster", + }, + }, + } + + DescribeTable( + "doesn't change the semantics", + func(resource *Cluster, isReplica bool) { + Expect(resource.IsReplica()).To(Equal(isReplica)) + }, + Entry( + "replica cluster with the old API", + replicaClusterOldAPI, true), + Entry( + "primary cluster with the old API", + primaryClusterOldAPI, false), + Entry( + "primary cluster with the old API, explicitly disabling replica", + primaryClusterOldAPIExplicit, false), + ) + }) + + Describe("using the new API, with an implicit self", func() { + primaryClusterNewAPI := &Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-1", + }, + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Primary: "cluster-1", + Enabled: nil, + Source: "source-cluster", + }, + }, + } + + replicaClusterNewAPI := &Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-1", + }, + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Primary: "cluster-2", + Enabled: nil, + Source: "source-cluster", + }, + }, + } + + DescribeTable( + "uses the primary cluster name", + func(resource *Cluster, isReplica bool) { + Expect(resource.IsReplica()).To(Equal(isReplica)) + }, + Entry( + "primary cluster", + primaryClusterNewAPI, false), + Entry( + "replica cluster", + replicaClusterNewAPI, true), + ) + }) + + Describe("using the new API, with an explicit self", func() { + primaryClusterNewAPI := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Self: "cluster-1", + Primary: "cluster-1", + Enabled: nil, + Source: "source-cluster", + }, + }, + } + + replicaClusterNewAPI := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Self: "cluster-1", + Primary: "cluster-2", + Enabled: nil, + Source: "source-cluster", + }, + }, + } + + DescribeTable( + "uses the primary cluster name", + func(resource *Cluster, isReplica bool) { + Expect(resource.IsReplica()).To(Equal(isReplica)) + }, + Entry( + "primary cluster", + primaryClusterNewAPI, false), + Entry( + "replica cluster", + replicaClusterNewAPI, true), + ) + }) +}) diff --git a/api/v1/cluster_webhook.go b/api/v1/cluster_webhook.go index e3a5079980..83af8c8ce9 100644 --- a/api/v1/cluster_webhook.go +++ b/api/v1/cluster_webhook.go @@ -1933,25 +1933,38 @@ func (r *Cluster) validatePromotionToken() field.ErrorList { return result } - if !r.Spec.ReplicaCluster.Enabled { - token := r.Spec.ReplicaCluster.PromotionToken - if len(token) > 0 { - tokenContent, err := utils.ParsePgControldataToken(token) - if err != nil { - result = append( - result, - field.Invalid( - field.NewPath("spec", "replicaCluster", "token"), - token, - fmt.Sprintf("Invalid promotionToken format: %s", err.Error()))) - } else if err := tokenContent.IsValid(); err != nil { - result = append( - result, - field.Invalid( - field.NewPath("spec", "replicaCluster", "token"), - token, - fmt.Sprintf("Invalid promotionToken content: %s", err.Error()))) - } + token := r.Spec.ReplicaCluster.PromotionToken + // Nothing to validate if the token is empty, we can immediately return + if len(token) == 0 { + return result + } + + if r.IsReplica() { + result = append( + result, + field.Invalid( + field.NewPath("spec", "replicaCluster", "token"), + token, + "promotionToken is only allowed for primary clusters")) + return result + } + + if !r.IsReplica() { + tokenContent, err := utils.ParsePgControldataToken(token) + if err != nil { + result = append( + result, + field.Invalid( + field.NewPath("spec", "replicaCluster", "token"), + token, + fmt.Sprintf("Invalid promotionToken format: %s", err.Error()))) + } else if err := tokenContent.IsValid(); err != nil { + result = append( + result, + field.Invalid( + field.NewPath("spec", "replicaCluster", "token"), + token, + fmt.Sprintf("Invalid promotionToken content: %s", err.Error()))) } } return result @@ -1962,34 +1975,86 @@ func (r *Cluster) validatePromotionToken() field.ErrorList { func (r *Cluster) validateReplicaMode() field.ErrorList { var result field.ErrorList - if r.Spec.ReplicaCluster == nil || !r.Spec.ReplicaCluster.Enabled { + replicaClusterConf := r.Spec.ReplicaCluster + if replicaClusterConf == nil { return result } - if r.Spec.Bootstrap == nil { - result = append(result, field.Invalid( - field.NewPath("spec", "bootstrap"), - r.Spec.ReplicaCluster, - "bootstrap configuration is required for replica mode")) - } else if r.Spec.Bootstrap.PgBaseBackup == nil && r.Spec.Bootstrap.Recovery == nil && - // this is needed because we only want to validate this during cluster creation, currently if we would have - // to enable this logic only during creation and not cluster changes it would require a meaningful refactor - len(r.ObjectMeta.ResourceVersion) == 0 { + // Having enabled set to "true" means that the automatic mode is not active. + // The "primary" field is used only when the automatic mode is active. + // This implies that hasEnabled and hasPrimary are mutually exclusive + hasEnabled := replicaClusterConf.Enabled != nil + hasPrimary := len(replicaClusterConf.Primary) > 0 + if hasPrimary && hasEnabled { result = append(result, field.Invalid( - field.NewPath("spec", "replicaCluster"), - r.Spec.ReplicaCluster, - "replica mode bootstrap is compatible only with pg_basebackup or recovery")) + field.NewPath("spec", "replicaCluster", "enabled"), + replicaClusterConf, + "replica mode enabled is not compatible with the primary field")) } - _, found := r.ExternalCluster(r.Spec.ReplicaCluster.Source) + + if r.IsReplica() { + if r.Spec.Bootstrap == nil { + result = append(result, field.Invalid( + field.NewPath("spec", "bootstrap"), + replicaClusterConf, + "bootstrap configuration is required for replica mode")) + } else if r.Spec.Bootstrap.PgBaseBackup == nil && r.Spec.Bootstrap.Recovery == nil && + // this is needed because we only want to validate this during cluster creation, currently if we would have + // to enable this logic only during creation and not cluster changes it would require a meaningful refactor + len(r.ObjectMeta.ResourceVersion) == 0 { + result = append(result, field.Invalid( + field.NewPath("spec", "replicaCluster"), + replicaClusterConf, + "replica mode bootstrap is compatible only with pg_basebackup or recovery")) + } + } + + result = append(result, r.validateReplicaClusterExternalClusters()...) + + return result +} + +func (r *Cluster) validateReplicaClusterExternalClusters() field.ErrorList { + var result field.ErrorList + replicaClusterConf := r.Spec.ReplicaCluster + if replicaClusterConf == nil { + return result + } + + // Check that the externalCluster references are correct + _, found := r.ExternalCluster(replicaClusterConf.Source) if !found { result = append( result, field.Invalid( field.NewPath("spec", "replicaCluster", "primaryServerName"), - r.Spec.ReplicaCluster.Source, - fmt.Sprintf("External cluster %v not found", r.Spec.ReplicaCluster.Source))) + replicaClusterConf.Source, + fmt.Sprintf("External cluster %v not found", replicaClusterConf.Source))) + } + + if len(replicaClusterConf.Self) > 0 { + _, found := r.ExternalCluster(replicaClusterConf.Self) + if !found { + result = append( + result, + field.Invalid( + field.NewPath("spec", "replicaCluster", "self"), + replicaClusterConf.Self, + fmt.Sprintf("External cluster %v not found", replicaClusterConf.Self))) + } } + if len(replicaClusterConf.Primary) > 0 { + _, found := r.ExternalCluster(replicaClusterConf.Primary) + if !found { + result = append( + result, + field.Invalid( + field.NewPath("spec", "replicaCluster", "primary"), + replicaClusterConf.Primary, + fmt.Sprintf("External cluster %v not found", replicaClusterConf.Primary))) + } + } return result } diff --git a/api/v1/cluster_webhook_test.go b/api/v1/cluster_webhook_test.go index 0d48634bce..6719f43142 100644 --- a/api/v1/cluster_webhook_test.go +++ b/api/v1/cluster_webhook_test.go @@ -1200,7 +1200,7 @@ var _ = Describe("configuration change validation", func() { Spec: ClusterSpec{ Instances: 1, ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), }, PostgresConfiguration: PostgresConfiguration{ Parameters: map[string]string{ @@ -2610,12 +2610,176 @@ var _ = Describe("unix permissions identifiers change validation", func() { }) }) +var _ = Describe("promotion token validation", func() { + It("complains if the replica token is not formatted in base64", func() { + cluster := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Enabled: ptr.To(false), + Source: "test", + PromotionToken: "this-is-a-wrong-token", + }, + Bootstrap: &BootstrapConfiguration{ + InitDB: &BootstrapInitDB{}, + }, + ExternalClusters: []ExternalCluster{ + { + Name: "test", + }, + }, + }, + } + + result := cluster.validatePromotionToken() + Expect(result).ToNot(BeEmpty()) + }) + + It("complains if the replica token is not valid", func() { + cluster := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Enabled: ptr.To(false), + Source: "test", + PromotionToken: base64.StdEncoding.EncodeToString([]byte("{}")), + }, + Bootstrap: &BootstrapConfiguration{ + InitDB: &BootstrapInitDB{}, + }, + ExternalClusters: []ExternalCluster{ + { + Name: "test", + }, + }, + }, + } + + result := cluster.validatePromotionToken() + Expect(result).ToNot(BeEmpty()) + }) + + It("doesn't complain if the replica token is valid", func() { + tokenContent := utils.PgControldataTokenContent{ + LatestCheckpointTimelineID: "3", + REDOWALFile: "this-wal-file", + DatabaseSystemIdentifier: "231231212", + LatestCheckpointREDOLocation: "33322232", + TimeOfLatestCheckpoint: "we don't know", + OperatorVersion: "version info", + } + jsonToken, err := json.Marshal(tokenContent) + Expect(err).ToNot(HaveOccurred()) + + cluster := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Enabled: ptr.To(false), + Source: "test", + PromotionToken: base64.StdEncoding.EncodeToString(jsonToken), + }, + Bootstrap: &BootstrapConfiguration{ + InitDB: &BootstrapInitDB{}, + }, + ExternalClusters: []ExternalCluster{ + { + Name: "test", + }, + }, + }, + } + + result := cluster.validatePromotionToken() + Expect(result).To(BeEmpty()) + }) + + It("complains if the token is set on a replica cluster (enabled)", func() { + tokenContent := utils.PgControldataTokenContent{ + LatestCheckpointTimelineID: "1", + REDOWALFile: "0000000100000001000000A1", + DatabaseSystemIdentifier: "231231212", + LatestCheckpointREDOLocation: "0/1000000", + TimeOfLatestCheckpoint: "we don't know", + OperatorVersion: "version info", + } + jsonToken, err := json.Marshal(tokenContent) + Expect(err).ToNot(HaveOccurred()) + + cluster := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Enabled: ptr.To(true), + Source: "test", + PromotionToken: base64.StdEncoding.EncodeToString(jsonToken), + }, + }, + } + + result := cluster.validatePromotionToken() + Expect(result).NotTo(BeEmpty()) + }) + + It("complains if the token is set on a replica cluster (primary, default name)", func() { + tokenContent := utils.PgControldataTokenContent{ + LatestCheckpointTimelineID: "1", + REDOWALFile: "0000000100000001000000A1", + DatabaseSystemIdentifier: "231231212", + LatestCheckpointREDOLocation: "0/1000000", + TimeOfLatestCheckpoint: "we don't know", + OperatorVersion: "version info", + } + jsonToken, err := json.Marshal(tokenContent) + Expect(err).ToNot(HaveOccurred()) + + cluster := &Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test2", + }, + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Primary: "test", + Source: "test", + PromotionToken: base64.StdEncoding.EncodeToString(jsonToken), + }, + }, + } + + result := cluster.validatePromotionToken() + Expect(result).NotTo(BeEmpty()) + }) + + It("complains if the token is set on a replica cluster (primary, self)", func() { + tokenContent := utils.PgControldataTokenContent{ + LatestCheckpointTimelineID: "1", + REDOWALFile: "0000000100000001000000A1", + DatabaseSystemIdentifier: "231231212", + LatestCheckpointREDOLocation: "0/1000000", + TimeOfLatestCheckpoint: "we don't know", + OperatorVersion: "version info", + } + jsonToken, err := json.Marshal(tokenContent) + Expect(err).ToNot(HaveOccurred()) + + cluster := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Primary: "test", + Self: "test2", + Source: "test", + PromotionToken: base64.StdEncoding.EncodeToString(jsonToken), + }, + }, + } + + result := cluster.validatePromotionToken() + Expect(result).NotTo(BeEmpty()) + }) +}) + var _ = Describe("replica mode validation", func() { It("complains if the bootstrap method is not specified", func() { cluster := &Cluster{ Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), Source: "test", }, ExternalClusters: []ExternalCluster{ @@ -2632,7 +2796,7 @@ var _ = Describe("replica mode validation", func() { cluster := &Cluster{ Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), Source: "test", }, Bootstrap: &BootstrapConfiguration{ @@ -2655,7 +2819,7 @@ var _ = Describe("replica mode validation", func() { }, Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), Source: "test", }, Bootstrap: &BootstrapConfiguration{ @@ -2679,7 +2843,7 @@ var _ = Describe("replica mode validation", func() { }, Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), Source: "test", }, Bootstrap: &BootstrapConfiguration{ @@ -2704,7 +2868,7 @@ var _ = Describe("replica mode validation", func() { }, Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: false, + Enabled: ptr.To(false), Source: "test", }, Bootstrap: &BootstrapConfiguration{ @@ -2733,7 +2897,7 @@ var _ = Describe("replica mode validation", func() { cluster := &Cluster{ Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), Source: "test", }, Bootstrap: &BootstrapConfiguration{ @@ -2754,7 +2918,7 @@ var _ = Describe("replica mode validation", func() { cluster := &Cluster{ Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), Source: "test", }, Bootstrap: &BootstrapConfiguration{ @@ -2771,11 +2935,12 @@ var _ = Describe("replica mode validation", func() { Expect(result).To(BeEmpty()) }) - It("complains when the external cluster doesn't exist", func() { + It("complains when the primary field is used with the enabled field", func() { cluster := &Cluster{ Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), + Primary: "toast", Source: "test", }, Bootstrap: &BootstrapConfiguration{ @@ -2784,25 +2949,22 @@ var _ = Describe("replica mode validation", func() { ExternalClusters: []ExternalCluster{}, }, } - - cluster.Spec.Bootstrap.PgBaseBackup = nil result := cluster.validateReplicaMode() Expect(result).ToNot(BeEmpty()) }) - It("complains if the replica token is not formatted in base64", func() { + It("doesn't complain when the enabled field is not specified", func() { cluster := &Cluster{ ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "existing", + Name: "test-2", }, Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: false, - Source: "test", - PromotionToken: "this-is-a-wrong-token", + Primary: "test", + Source: "test", }, Bootstrap: &BootstrapConfiguration{ - InitDB: &BootstrapInitDB{}, + PgBaseBackup: &BootstrapPgBaseBackup{}, }, ExternalClusters: []ExternalCluster{ { @@ -2811,21 +2973,19 @@ var _ = Describe("replica mode validation", func() { }, }, } - - result := cluster.validatePromotionToken() - Expect(result).ToNot(BeEmpty()) + result := cluster.validateReplicaMode() + Expect(result).To(BeEmpty()) }) - It("complains if the replica token is not valid", func() { + It("doesn't complain when creating a new primary cluster with the replication stanza set", func() { cluster := &Cluster{ ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "existing", + Name: "test", }, Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: false, - Source: "test", - PromotionToken: base64.StdEncoding.EncodeToString([]byte("{}")), + Primary: "test", + Source: "test", }, Bootstrap: &BootstrapConfiguration{ InitDB: &BootstrapInitDB{}, @@ -2837,35 +2997,63 @@ var _ = Describe("replica mode validation", func() { }, }, } + result := cluster.validateReplicaMode() + Expect(result).To(BeEmpty()) + }) +}) - result := cluster.validatePromotionToken() +var _ = Describe("validate the replica cluster external clusters", func() { + It("complains when the external cluster doesn't exist (source)", func() { + cluster := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Enabled: ptr.To(true), + Source: "test", + }, + Bootstrap: &BootstrapConfiguration{ + PgBaseBackup: &BootstrapPgBaseBackup{}, + }, + ExternalClusters: []ExternalCluster{}, + }, + } + + cluster.Spec.Bootstrap.PgBaseBackup = nil + result := cluster.validateReplicaClusterExternalClusters() Expect(result).ToNot(BeEmpty()) }) - It("doesn't complain if the replica token is valid", func() { - tokenContent := utils.PgControldataTokenContent{ - LatestCheckpointTimelineID: "3", - REDOWALFile: "this-wal-file", - DatabaseSystemIdentifier: "231231212", - LatestCheckpointREDOLocation: "33322232", - TimeOfLatestCheckpoint: "we don't know", - OperatorVersion: "version info", + It("complains when the external cluster doesn't exist (primary)", func() { + cluster := &Cluster{ + Spec: ClusterSpec{ + ReplicaCluster: &ReplicaClusterConfiguration{ + Primary: "test2", + Source: "test", + }, + Bootstrap: &BootstrapConfiguration{ + PgBaseBackup: &BootstrapPgBaseBackup{}, + }, + ExternalClusters: []ExternalCluster{ + { + Name: "test", + }, + }, + }, } - jsonToken, err := json.Marshal(tokenContent) - Expect(err).ToNot(HaveOccurred()) + result := cluster.validateReplicaClusterExternalClusters() + Expect(result).ToNot(BeEmpty()) + }) + + It("complains when the external cluster doesn't exist (self)", func() { cluster := &Cluster{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "existing", - }, Spec: ClusterSpec{ ReplicaCluster: &ReplicaClusterConfiguration{ - Enabled: true, - Source: "test", - PromotionToken: base64.StdEncoding.EncodeToString(jsonToken), + Self: "test2", + Primary: "test", + Source: "test", }, Bootstrap: &BootstrapConfiguration{ - InitDB: &BootstrapInitDB{}, + PgBaseBackup: &BootstrapPgBaseBackup{}, }, ExternalClusters: []ExternalCluster{ { @@ -2875,8 +3063,8 @@ var _ = Describe("replica mode validation", func() { }, } - result := cluster.validatePromotionToken() - Expect(result).To(BeEmpty()) + result := cluster.validateReplicaClusterExternalClusters() + Expect(result).ToNot(BeEmpty()) }) }) diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 5b905e3434..baa526cbbf 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -788,7 +788,7 @@ func (in *ClusterSpec) DeepCopyInto(out *ClusterSpec) { if in.ReplicaCluster != nil { in, out := &in.ReplicaCluster, &out.ReplicaCluster *out = new(ReplicaClusterConfiguration) - **out = **in + (*in).DeepCopyInto(*out) } if in.SuperuserSecret != nil { in, out := &in.SuperuserSecret, &out.SuperuserSecret @@ -2203,6 +2203,11 @@ func (in *RecoveryTarget) DeepCopy() *RecoveryTarget { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReplicaClusterConfiguration) DeepCopyInto(out *ReplicaClusterConfiguration) { *out = *in + if in.Enabled != nil { + in, out := &in.Enabled, &out.Enabled + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReplicaClusterConfiguration. diff --git a/config/crd/bases/postgresql.cnpg.io_clusters.yaml b/config/crd/bases/postgresql.cnpg.io_clusters.yaml index 0fd6c85e58..cac8b72bab 100644 --- a/config/crd/bases/postgresql.cnpg.io_clusters.yaml +++ b/config/crd/bases/postgresql.cnpg.io_clusters.yaml @@ -3855,24 +3855,29 @@ spec: object store or via streaming through pg_basebackup. Refer to the Replica clusters page of the documentation for more information. type: boolean + primary: + description: |- + Primary defines which Cluster is defined to be the primary in the distributed PostgreSQL cluster, based on the + topology specified in externalClusters + type: string promotionToken: description: |- A demotion token generated by an external cluster used to check if the promotion requirements are met. type: string + self: + description: |- + Self defines the name of this cluster. It is used to determine if this is a primary + or a replica cluster, comparing it with `primary` + type: string source: description: The name of the external cluster which is the replication origin minLength: 1 type: string required: - - enabled - source type: object - x-kubernetes-validations: - - message: Promotion token must be empty on replica clusters - rule: '!has(self.promotionToken) || size(self.promotionToken) == - 0 || !self.enabled' replicationSlots: default: highAvailability: diff --git a/docs/src/cloudnative-pg.v1.md b/docs/src/cloudnative-pg.v1.md index 91e31abfd6..6be8890eb1 100644 --- a/docs/src/cloudnative-pg.v1.md +++ b/docs/src/cloudnative-pg.v1.md @@ -4101,6 +4101,22 @@ cluster

+ + + + + + diff --git a/docs/src/samples/cluster-replica-tls.yaml b/docs/src/samples/cluster-replica-tls.yaml index ec72a47041..67954f62f1 100644 --- a/docs/src/samples/cluster-replica-tls.yaml +++ b/docs/src/samples/cluster-replica-tls.yaml @@ -10,7 +10,7 @@ spec: source: cluster-example replica: - enabled: true + primary: cluster-example source: cluster-example storage: diff --git a/docs/src/samples/dc/cluster-dc-a.yaml b/docs/src/samples/dc/cluster-dc-a.yaml index 5d179152fd..0da90de68c 100644 --- a/docs/src/samples/dc/cluster-dc-a.yaml +++ b/docs/src/samples/dc/cluster-dc-a.yaml @@ -25,7 +25,8 @@ spec: compression: gzip replica: - enabled: false + self: cluster-dc-a + primary: cluster-dc-a source: cluster-dc-b externalClusters: diff --git a/docs/src/samples/dc/cluster-dc-b.yaml b/docs/src/samples/dc/cluster-dc-b.yaml index 9ae7d626cd..355560b812 100644 --- a/docs/src/samples/dc/cluster-dc-b.yaml +++ b/docs/src/samples/dc/cluster-dc-b.yaml @@ -29,7 +29,8 @@ spec: source: cluster-dc-a replica: - enabled: true + self: cluster-dc-b + primary: cluster-dc-a source: cluster-dc-a externalClusters: diff --git a/internal/cmd/manager/walrestore/cmd_test.go b/internal/cmd/manager/walrestore/cmd_test.go index b66c446c8c..bc776e6b05 100644 --- a/internal/cmd/manager/walrestore/cmd_test.go +++ b/internal/cmd/manager/walrestore/cmd_test.go @@ -17,6 +17,8 @@ limitations under the License. package walrestore import ( + "k8s.io/utils/ptr" + apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" . "github.com/onsi/ginkgo/v2" @@ -58,7 +60,7 @@ var _ = Describe("Function isStreamingAvailable", func() { }, }, ReplicaCluster: &apiv1.ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), Source: "clusterSource", }, }, @@ -79,7 +81,7 @@ var _ = Describe("Function isStreamingAvailable", func() { }, }, ReplicaCluster: &apiv1.ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), Source: "clusterSource", }, }, @@ -100,7 +102,7 @@ var _ = Describe("Function isStreamingAvailable", func() { }, }, ReplicaCluster: &apiv1.ReplicaClusterConfiguration{ - Enabled: true, + Enabled: ptr.To(true), Source: "clusterSource", }, }, diff --git a/internal/controller/cluster_status.go b/internal/controller/cluster_status.go index 36223b3157..fe2c1201d7 100644 --- a/internal/controller/cluster_status.go +++ b/internal/controller/cluster_status.go @@ -347,7 +347,7 @@ func (r *ClusterReconciler) updateResourceStatus( cluster.Status.LastPromotionToken = "" } - if cluster.Spec.ReplicaCluster != nil && !cluster.Spec.ReplicaCluster.Enabled { + if !cluster.IsReplica() { cluster.Status.DemotionToken = "" } diff --git a/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-restart-1.yaml.template b/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-restart-1.yaml.template index f9d512e80d..483cc296b6 100644 --- a/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-restart-1.yaml.template +++ b/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-restart-1.yaml.template @@ -31,7 +31,7 @@ spec: immediateCheckpoint: true replica: - enabled: false + primary: replica-switchover-restart-a source: replica-switchover-restart-b externalClusters: diff --git a/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-restart-2.yaml.template b/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-restart-2.yaml.template index f5b59fb713..348c160631 100644 --- a/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-restart-2.yaml.template +++ b/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-restart-2.yaml.template @@ -35,7 +35,7 @@ spec: source: replica-switchover-restart-a replica: - enabled: true + primary: replica-switchover-restart-a source: replica-switchover-restart-a externalClusters: diff --git a/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-switchover-1.yaml.template b/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-switchover-1.yaml.template index 284d5d370c..10985407ef 100644 --- a/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-switchover-1.yaml.template +++ b/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-switchover-1.yaml.template @@ -31,7 +31,7 @@ spec: immediateCheckpoint: true replica: - enabled: false + primary: replica-switchover-switchover-a source: replica-switchover-switchover-b externalClusters: diff --git a/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-switchover-2.yaml.template b/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-switchover-2.yaml.template index e4cf4b2951..16124fef26 100644 --- a/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-switchover-2.yaml.template +++ b/tests/e2e/fixtures/replica_mode_cluster/cluster-replica-switchover-switchover-2.yaml.template @@ -35,7 +35,7 @@ spec: source: replica-switchover-switchover-a replica: - enabled: true + primary: replica-switchover-switchover-a source: replica-switchover-switchover-a externalClusters: diff --git a/tests/e2e/replica_mode_cluster_test.go b/tests/e2e/replica_mode_cluster_test.go index 9b901663c5..5b424f6372 100644 --- a/tests/e2e/replica_mode_cluster_test.go +++ b/tests/e2e/replica_mode_cluster_test.go @@ -174,7 +174,7 @@ var _ = Describe("Replica Mode", Label(tests.LabelReplication), func() { By("setting replica mode on the src cluster", func() { cluster, err := env.GetCluster(namespace, clusterOneName) Expect(err).ToNot(HaveOccurred()) - cluster.Spec.ReplicaCluster.Enabled = true + cluster.Spec.ReplicaCluster.Enabled = ptr.To(true) err = env.Client.Update(ctx, cluster) Expect(err).ToNot(HaveOccurred()) AssertClusterIsReady(namespace, clusterOneName, testTimeouts[testUtils.ClusterIsReady], env) @@ -200,7 +200,7 @@ var _ = Describe("Replica Mode", Label(tests.LabelReplication), func() { By("disabling the replica mode on the dst cluster", func() { cluster, err := env.GetCluster(namespace, clusterTwoName) Expect(err).ToNot(HaveOccurred()) - cluster.Spec.ReplicaCluster.Enabled = false + cluster.Spec.ReplicaCluster.Enabled = ptr.To(false) err = env.Client.Update(ctx, cluster) Expect(err).ToNot(HaveOccurred()) AssertClusterIsReady(namespace, clusterTwoName, testTimeouts[testUtils.ClusterIsReady], env) @@ -633,7 +633,7 @@ var _ = Describe("Replica switchover", Label(tests.LabelReplication), Ordered, f cluster, err := env.GetCluster(namespace, clusterAName) Expect(err).ToNot(HaveOccurred()) oldCluster := cluster.DeepCopy() - cluster.Spec.ReplicaCluster.Enabled = true + cluster.Spec.ReplicaCluster.Primary = clusterBName Expect(env.Client.Patch(env.Ctx, cluster, k8client.MergeFrom(oldCluster))).To(Succeed()) podList, err := env.GetClusterPodList(namespace, clusterAName) Expect(err).ToNot(HaveOccurred()) @@ -665,7 +665,7 @@ var _ = Describe("Replica switchover", Label(tests.LabelReplication), Ordered, f oldCluster := cluster.DeepCopy() cluster.Spec.ReplicaCluster.PromotionToken = invalidToken - cluster.Spec.ReplicaCluster.Enabled = false + cluster.Spec.ReplicaCluster.Primary = clusterBName Expect(env.Client.Patch(env.Ctx, cluster, k8client.MergeFrom(oldCluster))).To(Succeed()) }) @@ -688,7 +688,7 @@ var _ = Describe("Replica switchover", Label(tests.LabelReplication), Ordered, f Expect(err).ToNot(HaveOccurred()) oldCluster := cluster.DeepCopy() cluster.Spec.ReplicaCluster.PromotionToken = token - cluster.Spec.ReplicaCluster.Enabled = false + cluster.Spec.ReplicaCluster.Primary = clusterBName Expect(env.Client.Patch(env.Ctx, cluster, k8client.MergeFrom(oldCluster))).To(Succeed()) })
FieldDescription
self [Required]
+string +
+

Self defines the name of this cluster. It is used to determine if this is a primary +or a replica cluster, comparing it with primary

+
primary [Required]
+string +
+

Primary defines which Cluster is defined to be the primary in the distributed PostgreSQL cluster, based on the +topology specified in externalClusters

+
source [Required]
string