diff --git a/.wordlist-en-custom.txt b/.wordlist-en-custom.txt
index e564fc34f0..b493bdb8d3 100644
--- a/.wordlist-en-custom.txt
+++ b/.wordlist-en-custom.txt
@@ -338,6 +338,12 @@ PrimaryUpdateStrategy
PriorityClass
PriorityClassName
ProjectedVolumeSource
+PublicationReclaimPolicy
+PublicationSpec
+PublicationStatus
+PublicationTarget
+PublicationTargetAllTables
+PublicationTargetObject
PullPolicy
QoS
Quaresima
@@ -423,6 +429,9 @@ StatefulSets
StorageClass
StorageConfiguration
Storages
+SubscriptionReclaimPolicy
+SubscriptionSpec
+SubscriptionStatus
SuccessfullyExtracted
SwitchReplicaClusterStatus
SyncReplicaElectionConstraints
@@ -492,6 +501,7 @@ addons
affinityconfiguration
aks
albert
+allTables
allnamespaces
alloc
allocator
@@ -736,6 +746,7 @@ executables
expirations
extensibility
externalCluster
+externalClusterName
externalClusterSecretVersion
externalClusters
externalclusters
@@ -1056,6 +1067,10 @@ promotionTimeout
promotionToken
provisioner
psql
+publicate
+publicated
+publicationName
+publicationReclaimPolicy
pv
pvc
pvcCount
@@ -1200,6 +1215,7 @@ subcommand
subcommands
subdirectory
subresource
+subscriptionReclaimPolicy
substatement
successfullyExtracted
sudo
@@ -1218,6 +1234,7 @@ syslog
systemd
sysv
tAc
+tableExpression
tablespace
tablespaceClassName
tablespaceMapFile
diff --git a/PROJECT b/PROJECT
index 59c5113ca1..27b49f0be3 100644
--- a/PROJECT
+++ b/PROJECT
@@ -66,3 +66,21 @@ resources:
kind: Database
path: github.com/cloudnative-pg/cloudnative-pg/api/v1
version: v1
+- api:
+ crdVersion: v1
+ namespaced: true
+ controller: true
+ domain: cnpg.io
+ group: postgresql
+ kind: Publication
+ path: github.com/cloudnative-pg/cloudnative-pg/api/v1
+ version: v1
+- api:
+ crdVersion: v1
+ namespaced: true
+ controller: true
+ domain: cnpg.io
+ group: postgresql
+ kind: Subscription
+ path: github.com/cloudnative-pg/cloudnative-pg/api/v1
+ version: v1
diff --git a/api/v1/publication_funcs.go b/api/v1/publication_funcs.go
new file mode 100644
index 0000000000..57f73184a3
--- /dev/null
+++ b/api/v1/publication_funcs.go
@@ -0,0 +1,30 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1
+
+// SetAsFailed sets the publication as failed with the given error
+func (pub *Publication) SetAsFailed(err error) {
+ pub.Status.Ready = false
+ pub.Status.Error = err.Error()
+}
+
+// SetAsReady sets the subscription as working correctly
+func (pub *Publication) SetAsReady() {
+ pub.Status.Error = ""
+ pub.Status.Ready = true
+ pub.Status.ObservedGeneration = pub.Generation
+}
diff --git a/api/v1/publication_types.go b/api/v1/publication_types.go
new file mode 100644
index 0000000000..12a8e6c593
--- /dev/null
+++ b/api/v1/publication_types.go
@@ -0,0 +1,128 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1
+
+import (
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// PublicationReclaimPolicy describes a policy for end-of-life maintenance of Publications.
+// +enum
+type PublicationReclaimPolicy string
+
+const (
+ // PublicationReclaimDelete means the publication will be deleted from Kubernetes on release
+ // from its claim.
+ PublicationReclaimDelete PublicationReclaimPolicy = "delete"
+
+ // PublicationReclaimRetain means the publication will be left in its current phase for manual
+ // reclamation by the administrator. The default policy is Retain.
+ PublicationReclaimRetain PublicationReclaimPolicy = "retain"
+)
+
+// PublicationSpec defines the desired state of Publication
+type PublicationSpec struct {
+ // The corresponding cluster
+ ClusterRef corev1.LocalObjectReference `json:"cluster"`
+
+ // The name inside PostgreSQL
+ // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="name is immutable"
+ Name string `json:"name"`
+
+ // The name of the database
+ // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="dbname is immutable"
+ DBName string `json:"dbname"`
+
+ // The owner
+ Owner string `json:"owner,omitempty"`
+
+ // Parameters
+ Parameters map[string]string `json:"parameters,omitempty"`
+
+ // Publication target
+ Target PublicationTarget `json:"target,omitempty"`
+
+ // The policy for end-of-life maintenance of this publication
+ // +kubebuilder:validation:Enum=delete;retain
+ // +kubebuilder:default:=retain
+ // +optional
+ ReclaimPolicy PublicationReclaimPolicy `json:"publicationReclaimPolicy,omitempty"`
+}
+
+// PublicationTarget is what this publication should publish
+// +kubebuilder:validation:XValidation:rule="(has(self.allTables) && !has(self.objects)) || (!has(self.allTables) && has(self.objects))",message="allTables and objects are not compatible"
+type PublicationTarget struct {
+ // All tables should be publicated
+ AllTables bool `json:"allTables,omitempty"`
+
+ // Just the following schema objects
+ Objects []PublicationTargetObject `json:"objects,omitempty"`
+}
+
+// PublicationTargetObject is an object to publicate
+type PublicationTargetObject struct {
+ // The schema to publicate
+ Schema string `json:"schema,omitempty"`
+
+ // A list of table expressions
+ TableExpression []string `json:"tableExpression,omitempty"`
+}
+
+// PublicationStatus defines the observed state of Publication
+type PublicationStatus struct {
+ // A sequence number representing the latest
+ // desired state that was synchronized
+ // +optional
+ ObservedGeneration int64 `json:"observedGeneration,omitempty"`
+
+ // Ready is true if the database was reconciled correctly
+ Ready bool `json:"ready,omitempty"`
+
+ // Error is the reconciliation error message
+ Error string `json:"error,omitempty"`
+}
+
+// +kubebuilder:object:root=true
+// +kubebuilder:subresource:status
+// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
+// +kubebuilder:printcolumn:name="Cluster",type="string",JSONPath=".spec.cluster.name"
+// +kubebuilder:printcolumn:name="PG Name",type="string",JSONPath=".spec.name"
+// +kubebuilder:printcolumn:name="Ready",type="boolean",JSONPath=".status.ready"
+// +kubebuilder:printcolumn:name="Error",type="string",JSONPath=".status.error",description="Latest error message"
+
+// Publication is the Schema for the publications API
+type Publication struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ObjectMeta `json:"metadata"`
+
+ Spec PublicationSpec `json:"spec"`
+ Status PublicationStatus `json:"status,omitempty"`
+}
+
+// +kubebuilder:object:root=true
+
+// PublicationList contains a list of Publication
+type PublicationList struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ListMeta `json:"metadata,omitempty"`
+ Items []Publication `json:"items"`
+}
+
+func init() {
+ SchemeBuilder.Register(&Publication{}, &PublicationList{})
+}
diff --git a/api/v1/subscription_funcs.go b/api/v1/subscription_funcs.go
new file mode 100644
index 0000000000..40511e1380
--- /dev/null
+++ b/api/v1/subscription_funcs.go
@@ -0,0 +1,30 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1
+
+// SetAsFailed sets the subscription as failed with the given error
+func (sub *Subscription) SetAsFailed(err error) {
+ sub.Status.Ready = false
+ sub.Status.Error = err.Error()
+}
+
+// SetAsReady sets the subscription as working correctly
+func (sub *Subscription) SetAsReady() {
+ sub.Status.Error = ""
+ sub.Status.Ready = true
+ sub.Status.ObservedGeneration = sub.Generation
+}
diff --git a/api/v1/subscription_types.go b/api/v1/subscription_types.go
new file mode 100644
index 0000000000..e60bb8a187
--- /dev/null
+++ b/api/v1/subscription_types.go
@@ -0,0 +1,113 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1
+
+import (
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// SubscriptionReclaimPolicy describes a policy for end-of-life maintenance of Subscriptions.
+// +enum
+type SubscriptionReclaimPolicy string
+
+const (
+ // SubscriptionReclaimDelete means the subscription will be deleted from Kubernetes on release
+ // from its claim.
+ SubscriptionReclaimDelete SubscriptionReclaimPolicy = "delete"
+
+ // SubscriptionReclaimRetain means the subscription will be left in its current phase for manual
+ // reclamation by the administrator. The default policy is Retain.
+ SubscriptionReclaimRetain SubscriptionReclaimPolicy = "retain"
+)
+
+// SubscriptionSpec defines the desired state of Subscription
+type SubscriptionSpec struct {
+ // The corresponding cluster
+ ClusterRef corev1.LocalObjectReference `json:"cluster"`
+
+ // The name inside PostgreSQL
+ // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="name is immutable"
+ Name string `json:"name"`
+
+ // The owner
+ Owner string `json:"owner,omitempty"`
+
+ // The name of the database
+ // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="dbname is immutable"
+ DBName string `json:"dbname"`
+
+ // Parameters
+ // +optional
+ Parameters map[string]string `json:"parameters,omitempty"`
+
+ // The name of the publication
+ PublicationName string `json:"publicationName"`
+
+ // The name of the external cluster with the publication
+ ExternalClusterName string `json:"externalClusterName"`
+
+ // The policy for end-of-life maintenance of this subscription
+ // +kubebuilder:validation:Enum=delete;retain
+ // +kubebuilder:default:=retain
+ // +optional
+ ReclaimPolicy SubscriptionReclaimPolicy `json:"subscriptionReclaimPolicy,omitempty"`
+}
+
+// SubscriptionStatus defines the observed state of Subscription
+type SubscriptionStatus struct {
+ // A sequence number representing the latest
+ // desired state that was synchronized
+ // +optional
+ ObservedGeneration int64 `json:"observedGeneration,omitempty"`
+
+ // Ready is true if the database was reconciled correctly
+ Ready bool `json:"ready,omitempty"`
+
+ // Error is the reconciliation error message
+ Error string `json:"error,omitempty"`
+}
+
+// +kubebuilder:object:root=true
+// +kubebuilder:subresource:status
+// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
+// +kubebuilder:printcolumn:name="Cluster",type="string",JSONPath=".spec.cluster.name"
+// +kubebuilder:printcolumn:name="PG Name",type="string",JSONPath=".spec.name"
+// +kubebuilder:printcolumn:name="Ready",type="boolean",JSONPath=".status.ready"
+// +kubebuilder:printcolumn:name="Error",type="string",JSONPath=".status.error",description="Latest error message"
+
+// Subscription is the Schema for the subscriptions API
+type Subscription struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ObjectMeta `json:"metadata"`
+
+ Spec SubscriptionSpec `json:"spec"`
+ Status SubscriptionStatus `json:"status,omitempty"`
+}
+
+// +kubebuilder:object:root=true
+
+// SubscriptionList contains a list of Subscription
+type SubscriptionList struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ListMeta `json:"metadata,omitempty"`
+ Items []Subscription `json:"items"`
+}
+
+func init() {
+ SchemeBuilder.Register(&Subscription{}, &SubscriptionList{})
+}
diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go
index b9b4cf7690..f045980586 100644
--- a/api/v1/zz_generated.deepcopy.go
+++ b/api/v1/zz_generated.deepcopy.go
@@ -2170,6 +2170,146 @@ func (in *PostgresConfiguration) DeepCopy() *PostgresConfiguration {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Publication) DeepCopyInto(out *Publication) {
+ *out = *in
+ out.TypeMeta = in.TypeMeta
+ in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+ in.Spec.DeepCopyInto(&out.Spec)
+ out.Status = in.Status
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Publication.
+func (in *Publication) DeepCopy() *Publication {
+ if in == nil {
+ return nil
+ }
+ out := new(Publication)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
+func (in *Publication) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ }
+ return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *PublicationList) DeepCopyInto(out *PublicationList) {
+ *out = *in
+ out.TypeMeta = in.TypeMeta
+ in.ListMeta.DeepCopyInto(&out.ListMeta)
+ if in.Items != nil {
+ in, out := &in.Items, &out.Items
+ *out = make([]Publication, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PublicationList.
+func (in *PublicationList) DeepCopy() *PublicationList {
+ if in == nil {
+ return nil
+ }
+ out := new(PublicationList)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
+func (in *PublicationList) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ }
+ return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *PublicationSpec) DeepCopyInto(out *PublicationSpec) {
+ *out = *in
+ out.ClusterRef = in.ClusterRef
+ if in.Parameters != nil {
+ in, out := &in.Parameters, &out.Parameters
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+ in.Target.DeepCopyInto(&out.Target)
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PublicationSpec.
+func (in *PublicationSpec) DeepCopy() *PublicationSpec {
+ if in == nil {
+ return nil
+ }
+ out := new(PublicationSpec)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *PublicationStatus) DeepCopyInto(out *PublicationStatus) {
+ *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PublicationStatus.
+func (in *PublicationStatus) DeepCopy() *PublicationStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(PublicationStatus)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *PublicationTarget) DeepCopyInto(out *PublicationTarget) {
+ *out = *in
+ if in.Objects != nil {
+ in, out := &in.Objects, &out.Objects
+ *out = make([]PublicationTargetObject, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PublicationTarget.
+func (in *PublicationTarget) DeepCopy() *PublicationTarget {
+ if in == nil {
+ return nil
+ }
+ out := new(PublicationTarget)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *PublicationTargetObject) DeepCopyInto(out *PublicationTargetObject) {
+ *out = *in
+ if in.TableExpression != nil {
+ in, out := &in.TableExpression, &out.TableExpression
+ *out = make([]string, len(*in))
+ copy(*out, *in)
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PublicationTargetObject.
+func (in *PublicationTargetObject) DeepCopy() *PublicationTargetObject {
+ if in == nil {
+ return nil
+ }
+ out := new(PublicationTargetObject)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RecoveryTarget) DeepCopyInto(out *RecoveryTarget) {
*out = *in
@@ -2581,6 +2721,103 @@ func (in *StorageConfiguration) DeepCopy() *StorageConfiguration {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *Subscription) DeepCopyInto(out *Subscription) {
+ *out = *in
+ out.TypeMeta = in.TypeMeta
+ in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+ in.Spec.DeepCopyInto(&out.Spec)
+ out.Status = in.Status
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Subscription.
+func (in *Subscription) DeepCopy() *Subscription {
+ if in == nil {
+ return nil
+ }
+ out := new(Subscription)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
+func (in *Subscription) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ }
+ return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *SubscriptionList) DeepCopyInto(out *SubscriptionList) {
+ *out = *in
+ out.TypeMeta = in.TypeMeta
+ in.ListMeta.DeepCopyInto(&out.ListMeta)
+ if in.Items != nil {
+ in, out := &in.Items, &out.Items
+ *out = make([]Subscription, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionList.
+func (in *SubscriptionList) DeepCopy() *SubscriptionList {
+ if in == nil {
+ return nil
+ }
+ out := new(SubscriptionList)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
+func (in *SubscriptionList) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ }
+ return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *SubscriptionSpec) DeepCopyInto(out *SubscriptionSpec) {
+ *out = *in
+ out.ClusterRef = in.ClusterRef
+ if in.Parameters != nil {
+ in, out := &in.Parameters, &out.Parameters
+ *out = make(map[string]string, len(*in))
+ for key, val := range *in {
+ (*out)[key] = val
+ }
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionSpec.
+func (in *SubscriptionSpec) DeepCopy() *SubscriptionSpec {
+ if in == nil {
+ return nil
+ }
+ out := new(SubscriptionSpec)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *SubscriptionStatus) DeepCopyInto(out *SubscriptionStatus) {
+ *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionStatus.
+func (in *SubscriptionStatus) DeepCopy() *SubscriptionStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(SubscriptionStatus)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SwitchReplicaClusterStatus) DeepCopyInto(out *SwitchReplicaClusterStatus) {
*out = *in
diff --git a/config/crd/bases/postgresql.cnpg.io_publications.yaml b/config/crd/bases/postgresql.cnpg.io_publications.yaml
new file mode 100644
index 0000000000..9663073ae7
--- /dev/null
+++ b/config/crd/bases/postgresql.cnpg.io_publications.yaml
@@ -0,0 +1,154 @@
+---
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ annotations:
+ controller-gen.kubebuilder.io/version: v0.16.4
+ name: publications.postgresql.cnpg.io
+spec:
+ group: postgresql.cnpg.io
+ names:
+ kind: Publication
+ listKind: PublicationList
+ plural: publications
+ singular: publication
+ scope: Namespaced
+ versions:
+ - additionalPrinterColumns:
+ - jsonPath: .metadata.creationTimestamp
+ name: Age
+ type: date
+ - jsonPath: .spec.cluster.name
+ name: Cluster
+ type: string
+ - jsonPath: .spec.name
+ name: PG Name
+ type: string
+ - jsonPath: .status.ready
+ name: Ready
+ type: boolean
+ - description: Latest error message
+ jsonPath: .status.error
+ name: Error
+ type: string
+ name: v1
+ schema:
+ openAPIV3Schema:
+ description: Publication is the Schema for the publications API
+ properties:
+ apiVersion:
+ description: |-
+ APIVersion defines the versioned schema of this representation of an object.
+ Servers should convert recognized schemas to the latest internal value, and
+ may reject unrecognized values.
+ More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+ type: string
+ kind:
+ description: |-
+ Kind is a string value representing the REST resource this object represents.
+ Servers may infer this from the endpoint the client submits requests to.
+ Cannot be updated.
+ In CamelCase.
+ More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+ type: string
+ metadata:
+ type: object
+ spec:
+ description: PublicationSpec defines the desired state of Publication
+ properties:
+ cluster:
+ description: The corresponding cluster
+ properties:
+ name:
+ default: ""
+ description: |-
+ Name of the referent.
+ This field is effectively required, but due to backwards compatibility is
+ allowed to be empty. Instances of this type with an empty value here are
+ almost certainly wrong.
+ More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
+ type: string
+ type: object
+ x-kubernetes-map-type: atomic
+ dbname:
+ description: The name of the database
+ type: string
+ x-kubernetes-validations:
+ - message: dbname is immutable
+ rule: self == oldSelf
+ name:
+ description: The name inside PostgreSQL
+ type: string
+ x-kubernetes-validations:
+ - message: name is immutable
+ rule: self == oldSelf
+ owner:
+ description: The owner
+ type: string
+ parameters:
+ additionalProperties:
+ type: string
+ description: Parameters
+ type: object
+ publicationReclaimPolicy:
+ default: retain
+ description: The policy for end-of-life maintenance of this publication
+ enum:
+ - delete
+ - retain
+ type: string
+ target:
+ description: Publication target
+ properties:
+ allTables:
+ description: All tables should be publicated
+ type: boolean
+ objects:
+ description: Just the following schema objects
+ items:
+ description: PublicationTargetObject is an object to publicate
+ properties:
+ schema:
+ description: The schema to publicate
+ type: string
+ tableExpression:
+ description: A list of table expressions
+ items:
+ type: string
+ type: array
+ type: object
+ type: array
+ type: object
+ x-kubernetes-validations:
+ - message: allTables and objects are not compatible
+ rule: (has(self.allTables) && !has(self.objects)) || (!has(self.allTables)
+ && has(self.objects))
+ required:
+ - cluster
+ - dbname
+ - name
+ type: object
+ status:
+ description: PublicationStatus defines the observed state of Publication
+ properties:
+ error:
+ description: Error is the reconciliation error message
+ type: string
+ observedGeneration:
+ description: |-
+ A sequence number representing the latest
+ desired state that was synchronized
+ format: int64
+ type: integer
+ ready:
+ description: Ready is true if the database was reconciled correctly
+ type: boolean
+ type: object
+ required:
+ - metadata
+ - spec
+ type: object
+ served: true
+ storage: true
+ subresources:
+ status: {}
diff --git a/config/crd/bases/postgresql.cnpg.io_subscriptions.yaml b/config/crd/bases/postgresql.cnpg.io_subscriptions.yaml
new file mode 100644
index 0000000000..a65f4fdabc
--- /dev/null
+++ b/config/crd/bases/postgresql.cnpg.io_subscriptions.yaml
@@ -0,0 +1,136 @@
+---
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ annotations:
+ controller-gen.kubebuilder.io/version: v0.16.4
+ name: subscriptions.postgresql.cnpg.io
+spec:
+ group: postgresql.cnpg.io
+ names:
+ kind: Subscription
+ listKind: SubscriptionList
+ plural: subscriptions
+ singular: subscription
+ scope: Namespaced
+ versions:
+ - additionalPrinterColumns:
+ - jsonPath: .metadata.creationTimestamp
+ name: Age
+ type: date
+ - jsonPath: .spec.cluster.name
+ name: Cluster
+ type: string
+ - jsonPath: .spec.name
+ name: PG Name
+ type: string
+ - jsonPath: .status.ready
+ name: Ready
+ type: boolean
+ - description: Latest error message
+ jsonPath: .status.error
+ name: Error
+ type: string
+ name: v1
+ schema:
+ openAPIV3Schema:
+ description: Subscription is the Schema for the subscriptions API
+ properties:
+ apiVersion:
+ description: |-
+ APIVersion defines the versioned schema of this representation of an object.
+ Servers should convert recognized schemas to the latest internal value, and
+ may reject unrecognized values.
+ More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+ type: string
+ kind:
+ description: |-
+ Kind is a string value representing the REST resource this object represents.
+ Servers may infer this from the endpoint the client submits requests to.
+ Cannot be updated.
+ In CamelCase.
+ More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+ type: string
+ metadata:
+ type: object
+ spec:
+ description: SubscriptionSpec defines the desired state of Subscription
+ properties:
+ cluster:
+ description: The corresponding cluster
+ properties:
+ name:
+ default: ""
+ description: |-
+ Name of the referent.
+ This field is effectively required, but due to backwards compatibility is
+ allowed to be empty. Instances of this type with an empty value here are
+ almost certainly wrong.
+ More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
+ type: string
+ type: object
+ x-kubernetes-map-type: atomic
+ dbname:
+ description: The name of the database
+ type: string
+ x-kubernetes-validations:
+ - message: dbname is immutable
+ rule: self == oldSelf
+ externalClusterName:
+ description: The name of the external cluster with the publication
+ type: string
+ name:
+ description: The name inside PostgreSQL
+ type: string
+ x-kubernetes-validations:
+ - message: name is immutable
+ rule: self == oldSelf
+ owner:
+ description: The owner
+ type: string
+ parameters:
+ additionalProperties:
+ type: string
+ description: Parameters
+ type: object
+ publicationName:
+ description: The name of the publication
+ type: string
+ subscriptionReclaimPolicy:
+ default: retain
+ description: The policy for end-of-life maintenance of this subscription
+ enum:
+ - delete
+ - retain
+ type: string
+ required:
+ - cluster
+ - dbname
+ - externalClusterName
+ - name
+ - publicationName
+ type: object
+ status:
+ description: SubscriptionStatus defines the observed state of Subscription
+ properties:
+ error:
+ description: Error is the reconciliation error message
+ type: string
+ observedGeneration:
+ description: |-
+ A sequence number representing the latest
+ desired state that was synchronized
+ format: int64
+ type: integer
+ ready:
+ description: Ready is true if the database was reconciled correctly
+ type: boolean
+ type: object
+ required:
+ - metadata
+ - spec
+ type: object
+ served: true
+ storage: true
+ subresources:
+ status: {}
diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml
index 5e4757d42c..6100960f12 100644
--- a/config/crd/kustomization.yaml
+++ b/config/crd/kustomization.yaml
@@ -11,6 +11,9 @@ resources:
- bases/postgresql.cnpg.io_imagecatalogs.yaml
- bases/postgresql.cnpg.io_clusterimagecatalogs.yaml
- bases/postgresql.cnpg.io_databases.yaml
+- bases/postgresql.cnpg.io_publications.yaml
+- bases/postgresql.cnpg.io_subscriptions.yaml
+
# +kubebuilder:scaffold:crdkustomizeresource
patches:
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix.
@@ -52,6 +55,17 @@ patches:
# kind: CustomResourceDefinition
# name: poolers.postgresql.cnpg.io
#- path: patches/cainjection_in_databases.yaml
+# target:
+# kind: CustomResourceDefinition
+# name: databases.postgresql.cnpg.io
+#- path: patches/cainjection_in_publications.yaml
+# target:
+# kind: CustomResourceDefinition
+# name: publications.postgresql.cnpg.io
+#- path: patches/cainjection_in_subscriptions.yaml
+# target:
+# kind: CustomResourceDefinition
+# name: subscriptions.postgresql.cnpg.io
# +kubebuilder:scaffold:crdkustomizecainjectionpatch
# the following config is for teaching kustomize how to do kustomization for CRDs.
diff --git a/config/olm-manifests/bases/cloudnative-pg.clusterserviceversion.yaml b/config/olm-manifests/bases/cloudnative-pg.clusterserviceversion.yaml
index 6f7a2108f5..0bf3485944 100644
--- a/config/olm-manifests/bases/cloudnative-pg.clusterserviceversion.yaml
+++ b/config/olm-manifests/bases/cloudnative-pg.clusterserviceversion.yaml
@@ -688,7 +688,7 @@ spec:
specDescriptors:
- path: databaseReclaimPolicy
displayName: Database reclaim policy
- description: Database reclame policy
+ description: Database reclaim policy
- path: cluster
displayName: Cluster requested to create the database
description: Cluster requested to create the database
@@ -698,3 +698,50 @@ spec:
- path: owner
displayName: Database Owner
description: Database Owner
+ - kind: Publication
+ name: publications.postgresql.cnpg.io
+ displayName: Publication
+ description: Declarative publication
+ version: v1
+ resources:
+ - kind: Cluster
+ name: ''
+ version: v1
+ specDescriptors:
+ - path: name
+ displayName: Publication name
+ description: Publication name
+ - path: dbname
+ displayName: Database name
+ description: Database name
+ - path: cluster
+ displayName: Cluster requested to create the publication
+ description: Cluster requested to create the publication
+ - path: target
+ displayName: Publication target
+ description: Publication target
+ - kind: Subscription
+ name: subscriptions.postgresql.cnpg.io
+ displayName: Subscription
+ description: Declarative subscription
+ version: v1
+ resources:
+ - kind: Cluster
+ name: ''
+ version: v1
+ specDescriptors:
+ - path: name
+ displayName: Subscription name
+ description: Subscription name
+ - path: dbname
+ displayName: Database name
+ description: Database name
+ - path: publicationName
+ displayName: Publication name
+ description: Publication name
+ - path: cluster
+ displayName: Cluster requested to create the subscription
+ description: Cluster requested to create the subscription
+ - path: externalClusterName
+ displayName: Name of the external cluster with publication
+ description: Name of the external cluster with publication
diff --git a/config/olm-samples/kustomization.yaml b/config/olm-samples/kustomization.yaml
index 205a50a544..6bb494f569 100644
--- a/config/olm-samples/kustomization.yaml
+++ b/config/olm-samples/kustomization.yaml
@@ -6,3 +6,5 @@ resources:
- postgresql_v1_imagecatalog.yaml
- postgresql_v1_clusterimagecatalog.yaml
- postgresql_v1_database.yaml
+- postgresql_v1_publication.yaml
+- postgresql_v1_subscription.yaml
diff --git a/config/olm-samples/postgresql_v1_publication.yaml b/config/olm-samples/postgresql_v1_publication.yaml
new file mode 100644
index 0000000000..598c02a2bb
--- /dev/null
+++ b/config/olm-samples/postgresql_v1_publication.yaml
@@ -0,0 +1,11 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Publication
+metadata:
+ name: publication-sample
+spec:
+ name: pub
+ dbname: app
+ cluster:
+ name: cluster-sample
+ target:
+ allTables: true
diff --git a/config/olm-samples/postgresql_v1_subscription.yaml b/config/olm-samples/postgresql_v1_subscription.yaml
new file mode 100644
index 0000000000..ecc016619b
--- /dev/null
+++ b/config/olm-samples/postgresql_v1_subscription.yaml
@@ -0,0 +1,11 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Subscription
+metadata:
+ name: subscription-sample
+spec:
+ name: sub
+ dbname: app
+ publicationName: pub
+ cluster:
+ name: cluster-sample-dest
+ externalClusterName: cluster-sample
diff --git a/config/rbac/kustomization.yaml b/config/rbac/kustomization.yaml
index 99493b37c4..a561c73dc9 100644
--- a/config/rbac/kustomization.yaml
+++ b/config/rbac/kustomization.yaml
@@ -14,6 +14,10 @@ resources:
# default, aiding admins in cluster management. Those roles are
# not used by the Project itself. You can comment the following lines
# if you do not want those helpers be installed with your Project.
+- subscription_editor_role.yaml
+- subscription_viewer_role.yaml
+- publication_editor_role.yaml
+- publication_viewer_role.yaml
- database_editor_role.yaml
- database_viewer_role.yaml
diff --git a/config/rbac/publication_editor_role.yaml b/config/rbac/publication_editor_role.yaml
new file mode 100644
index 0000000000..f741900fa3
--- /dev/null
+++ b/config/rbac/publication_editor_role.yaml
@@ -0,0 +1,27 @@
+# permissions for end users to edit publications.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ labels:
+ app.kubernetes.io/name: cloudnative-pg-kubebuilderv4
+ app.kubernetes.io/managed-by: kustomize
+ name: publication-editor-role
+rules:
+- apiGroups:
+ - postgresql.cnpg.io
+ resources:
+ - publications
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - patch
+ - update
+ - watch
+- apiGroups:
+ - postgresql.cnpg.io
+ resources:
+ - publications/status
+ verbs:
+ - get
diff --git a/config/rbac/publication_viewer_role.yaml b/config/rbac/publication_viewer_role.yaml
new file mode 100644
index 0000000000..32e84f531f
--- /dev/null
+++ b/config/rbac/publication_viewer_role.yaml
@@ -0,0 +1,23 @@
+# permissions for end users to view publications.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ labels:
+ app.kubernetes.io/name: cloudnative-pg-kubebuilderv4
+ app.kubernetes.io/managed-by: kustomize
+ name: publication-viewer-role
+rules:
+- apiGroups:
+ - postgresql.cnpg.io
+ resources:
+ - publications
+ verbs:
+ - get
+ - list
+ - watch
+- apiGroups:
+ - postgresql.cnpg.io
+ resources:
+ - publications/status
+ verbs:
+ - get
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index ce1e7ded88..f47a568f0d 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -141,7 +141,9 @@ rules:
- clusters
- databases
- poolers
+ - publications
- scheduledbackups
+ - subscriptions
verbs:
- create
- delete
@@ -155,7 +157,9 @@ rules:
resources:
- backups/status
- databases/status
+ - publications/status
- scheduledbackups/status
+ - subscriptions/status
verbs:
- get
- patch
diff --git a/config/rbac/subscription_editor_role.yaml b/config/rbac/subscription_editor_role.yaml
new file mode 100644
index 0000000000..066b1c494d
--- /dev/null
+++ b/config/rbac/subscription_editor_role.yaml
@@ -0,0 +1,27 @@
+# permissions for end users to edit subscriptions.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ labels:
+ app.kubernetes.io/name: cloudnative-pg-kubebuilderv4
+ app.kubernetes.io/managed-by: kustomize
+ name: subscription-editor-role
+rules:
+- apiGroups:
+ - postgresql.cnpg.io
+ resources:
+ - subscriptions
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - patch
+ - update
+ - watch
+- apiGroups:
+ - postgresql.cnpg.io
+ resources:
+ - subscriptions/status
+ verbs:
+ - get
diff --git a/config/rbac/subscription_viewer_role.yaml b/config/rbac/subscription_viewer_role.yaml
new file mode 100644
index 0000000000..4cf8ff0d06
--- /dev/null
+++ b/config/rbac/subscription_viewer_role.yaml
@@ -0,0 +1,23 @@
+# permissions for end users to view subscriptions.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ labels:
+ app.kubernetes.io/name: cloudnative-pg-kubebuilderv4
+ app.kubernetes.io/managed-by: kustomize
+ name: subscription-viewer-role
+rules:
+- apiGroups:
+ - postgresql.cnpg.io
+ resources:
+ - subscriptions
+ verbs:
+ - get
+ - list
+ - watch
+- apiGroups:
+ - postgresql.cnpg.io
+ resources:
+ - subscriptions/status
+ verbs:
+ - get
diff --git a/contribute/e2e_testing_environment/README.md b/contribute/e2e_testing_environment/README.md
index 30a41ddaf4..dd956ab464 100644
--- a/contribute/e2e_testing_environment/README.md
+++ b/contribute/e2e_testing_environment/README.md
@@ -206,6 +206,7 @@ exported, it will select all medium test cases from the feature type provided.
| `security` |
| `maintenance` |
| `tablespaces` |
+| `publication-subscription` |
| `declarative-databases` |
ex:
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index b3dd9c55f7..5d85a9460a 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -22,6 +22,7 @@ nav:
- image_catalog.md
- bootstrap.md
- database_import.md
+ - declarative_publication_subscription_management.md
- security.md
- instance_manager.md
- scheduling.md
diff --git a/docs/src/cloudnative-pg.v1.md b/docs/src/cloudnative-pg.v1.md
index f5678cb22e..2b6d6c28ed 100644
--- a/docs/src/cloudnative-pg.v1.md
+++ b/docs/src/cloudnative-pg.v1.md
@@ -3951,6 +3951,218 @@ the primary server of the cluster as part of rolling updates
+## Publication {#postgresql-cnpg-io-v1-Publication}
+
+
+
+Publication is the Schema for the publications API
+
+
+
+Field | Description |
+
+metadata [Required]
+meta/v1.ObjectMeta
+ |
+
+ No description provided.Refer to the Kubernetes API documentation for the fields of the metadata field. |
+
+spec [Required]
+PublicationSpec
+ |
+
+ No description provided. |
+
+status [Required]
+PublicationStatus
+ |
+
+ No description provided. |
+
+
+
+
+## PublicationReclaimPolicy {#postgresql-cnpg-io-v1-PublicationReclaimPolicy}
+
+(Alias of `string`)
+
+**Appears in:**
+
+- [PublicationSpec](#postgresql-cnpg-io-v1-PublicationSpec)
+
+
+PublicationReclaimPolicy describes a policy for end-of-life maintenance of Publications.
+
+
+
+
+## PublicationSpec {#postgresql-cnpg-io-v1-PublicationSpec}
+
+
+**Appears in:**
+
+- [Publication](#postgresql-cnpg-io-v1-Publication)
+
+
+PublicationSpec defines the desired state of Publication
+
+
+
+Field | Description |
+
+cluster [Required]
+core/v1.LocalObjectReference
+ |
+
+ The corresponding cluster
+ |
+
+name [Required]
+string
+ |
+
+ The name inside PostgreSQL
+ |
+
+dbname [Required]
+string
+ |
+
+ The name of the database
+ |
+
+owner [Required]
+string
+ |
+
+ The owner
+ |
+
+parameters [Required]
+map[string]string
+ |
+
+ Parameters
+ |
+
+target [Required]
+PublicationTarget
+ |
+
+ Publication target
+ |
+
+publicationReclaimPolicy
+PublicationReclaimPolicy
+ |
+
+ The policy for end-of-life maintenance of this publication
+ |
+
+
+
+
+## PublicationStatus {#postgresql-cnpg-io-v1-PublicationStatus}
+
+
+**Appears in:**
+
+- [Publication](#postgresql-cnpg-io-v1-Publication)
+
+
+PublicationStatus defines the observed state of Publication
+
+
+
+Field | Description |
+
+observedGeneration
+int64
+ |
+
+ A sequence number representing the latest
+desired state that was synchronized
+ |
+
+ready [Required]
+bool
+ |
+
+ Ready is true if the database was reconciled correctly
+ |
+
+error [Required]
+string
+ |
+
+ Error is the reconciliation error message
+ |
+
+
+
+
+## PublicationTarget {#postgresql-cnpg-io-v1-PublicationTarget}
+
+
+**Appears in:**
+
+- [PublicationSpec](#postgresql-cnpg-io-v1-PublicationSpec)
+
+
+PublicationTarget is what this publication should publish
+
+
+
+Field | Description |
+
+allTables [Required]
+bool
+ |
+
+ All tables should be publicated
+ |
+
+objects [Required]
+[]PublicationTargetObject
+ |
+
+ Just the following schema objects
+ |
+
+
+
+
+## PublicationTargetObject {#postgresql-cnpg-io-v1-PublicationTargetObject}
+
+
+**Appears in:**
+
+- [PublicationTarget](#postgresql-cnpg-io-v1-PublicationTarget)
+
+
+PublicationTargetObject is an object to publicate
+
+
+
+Field | Description |
+
+schema [Required]
+string
+ |
+
+ The schema to publicate
+ |
+
+tableExpression [Required]
+[]string
+ |
+
+ A list of table expressions
+ |
+
+
+
+
## RecoveryTarget {#postgresql-cnpg-io-v1-RecoveryTarget}
@@ -4813,6 +5025,163 @@ Size cannot be decreased.
+## Subscription {#postgresql-cnpg-io-v1-Subscription}
+
+
+
+Subscription is the Schema for the subscriptions API
+
+
+
+Field | Description |
+
+metadata [Required]
+meta/v1.ObjectMeta
+ |
+
+ No description provided.Refer to the Kubernetes API documentation for the fields of the metadata field. |
+
+spec [Required]
+SubscriptionSpec
+ |
+
+ No description provided. |
+
+status [Required]
+SubscriptionStatus
+ |
+
+ No description provided. |
+
+
+
+
+## SubscriptionReclaimPolicy {#postgresql-cnpg-io-v1-SubscriptionReclaimPolicy}
+
+(Alias of `string`)
+
+**Appears in:**
+
+- [SubscriptionSpec](#postgresql-cnpg-io-v1-SubscriptionSpec)
+
+
+SubscriptionReclaimPolicy describes a policy for end-of-life maintenance of Subscriptions.
+
+
+
+
+## SubscriptionSpec {#postgresql-cnpg-io-v1-SubscriptionSpec}
+
+
+**Appears in:**
+
+- [Subscription](#postgresql-cnpg-io-v1-Subscription)
+
+
+SubscriptionSpec defines the desired state of Subscription
+
+
+
+Field | Description |
+
+cluster [Required]
+core/v1.LocalObjectReference
+ |
+
+ The corresponding cluster
+ |
+
+name [Required]
+string
+ |
+
+ The name inside PostgreSQL
+ |
+
+owner [Required]
+string
+ |
+
+ The owner
+ |
+
+dbname [Required]
+string
+ |
+
+ The name of the database
+ |
+
+parameters
+map[string]string
+ |
+
+ Parameters
+ |
+
+publicationName [Required]
+string
+ |
+
+ The name of the publication
+ |
+
+externalClusterName [Required]
+string
+ |
+
+ The name of the external cluster with the publication
+ |
+
+subscriptionReclaimPolicy
+SubscriptionReclaimPolicy
+ |
+
+ The policy for end-of-life maintenance of this subscription
+ |
+
+
+
+
+## SubscriptionStatus {#postgresql-cnpg-io-v1-SubscriptionStatus}
+
+
+**Appears in:**
+
+- [Subscription](#postgresql-cnpg-io-v1-Subscription)
+
+
+SubscriptionStatus defines the observed state of Subscription
+
+
+
+Field | Description |
+
+observedGeneration
+int64
+ |
+
+ A sequence number representing the latest
+desired state that was synchronized
+ |
+
+ready [Required]
+bool
+ |
+
+ Ready is true if the database was reconciled correctly
+ |
+
+error [Required]
+string
+ |
+
+ Error is the reconciliation error message
+ |
+
+
+
+
## SwitchReplicaClusterStatus {#postgresql-cnpg-io-v1-SwitchReplicaClusterStatus}
diff --git a/docs/src/declarative_publication_subscription_management.md b/docs/src/declarative_publication_subscription_management.md
new file mode 100644
index 0000000000..7f4ab87e45
--- /dev/null
+++ b/docs/src/declarative_publication_subscription_management.md
@@ -0,0 +1,161 @@
+# Declarative Publication/Subscription Management
+
+Declarative publication/subscription management enables users to set up
+logical replication via the following Custom Resource Definitions (CRD):
+
+- `Database` ,
+- `Publication`,
+- `Subscription`,
+
+The Database CRD is discussed in depth in the
+["Declarative database management"](declarative_database_management.md) section.
+In this section we describe `Publication` and `Subscription` in more detail.
+
+## Overview
+
+The procedure to set up logical replication:
+
+- Begins with two CloudNativePG clusters.
+ - One of them will be the "source"
+ - The "destination" cluster should have an `externalClusters` stanza
+ containing the connection information to the source cluster
+- A Database object creating a database (e.g. named `sample`) in the source
+ cluster
+- A Database object creating a database with the same name in the destination
+ cluster
+- A Publication in the source cluster referencing the database
+- A Subscription in the destination cluster, referencing the Publication that
+ was created in the previous step
+
+Once these objects are reconciled, PostgreSQL will replicate the data from
+the source cluster to the destination cluster using logical replication. There
+are many use cases for logical replication; please refer to the
+[PostgreSQL documentation](https://www.postgresql.org/docs/current/logical-replication.html)
+for detailed discussion.
+
+!!! Note
+ the `externalClusters` section in the destination cluster has the same
+ structure used in [database import](database_import.md) as well as for
+ replica clusters. However, the destination cluster does not necessarily
+ have to be bootstrapped via replication nor import.
+
+### Example: Simple Publication Declaration
+
+A `Publication` object is managed by the instance manager of the source
+cluster's primary instance.
+Below is an example of a basic `Publication` configuration:
+
+```yaml
+apiVersion: postgresql.cnpg.io/v1
+kind: Publication
+metadata:
+ name: pub-one
+spec:
+ name: pub
+ dbname: cat
+ cluster:
+ name: source-cluster
+ target:
+ allTables: true
+```
+
+The `dbname` field specifies the database the publication is applied to.
+Once the reconciliation cycle is completed successfully, the `Publication`
+status will show a `ready` field set to `true`, and an empty `error` field.
+
+### Publication Deletion and Reclaim Policies
+
+A finalizer named `cnpg.io/deletePublication` is automatically added
+to each `Publication` object to control its deletion process.
+
+By default, the `publicationReclaimPolicy` is set to `retain`, which means
+that if the `Publication` object is deleted, the actual PostgreSQL publication
+is retained for manual management by an administrator.
+
+Alternatively, if the `publicationReclaimPolicy` is set to `delete`,
+the PostgreSQL publication will be automatically deleted when the `Publication`
+object is removed.
+
+### Example: Publication with Delete Reclaim Policy
+
+The following example illustrates a `Publication` object with a `delete`
+reclaim policy:
+
+```yaml
+apiVersion: postgresql.cnpg.io/v1
+kind: Publication
+metadata:
+ name: pub-one
+spec:
+ name: pub
+ dbname: cat
+ publicationReclaimPolicy: delete
+ cluster:
+ name: source-cluster
+ target:
+ allTables: true
+```
+
+In this case, when the `Publication` object is deleted, the corresponding PostgreSQL publication will also be removed automatically.
+
+### Example: Simple Subscription Declaration
+
+A `Subscription` object is managed by the instance manager of the destination
+cluster's primary instance.
+Below is an example of a basic `Subscription` configuration:
+
+```yaml
+apiVersion: postgresql.cnpg.io/v1
+kind: Subscription
+metadata:
+ name: sub-one
+spec:
+ name: sub
+ dbname: cat
+ publicationName: pub
+ cluster:
+ name: destination-cluster
+ externalClusterName: source-cluster
+```
+
+The `dbname` field specifies the database the publication is applied to.
+The `publicationName` field specifies the name of the publication the subscription refers to.
+The `externalClusterName` field specifies the external cluster the publication belongs to.
+
+Once the reconciliation cycle is completed successfully, the `Subscription`
+status will show a `ready` field set to `true` and an empty `error` field.
+
+## Subscription Deletion and Reclaim Policies
+
+A finalizer named `cnpg.io/deleteSubscription` is automatically added
+to each `Subscription` object to control its deletion process.
+
+By default, the `subscriptionReclaimPolicy` is set to `retain`, which means
+that if the `Subscription` object is deleted, the actual PostgreSQL publication
+is retained for manual management by an administrator.
+
+Alternatively, if the `subscriptionReclaimPolicy` is set to `delete`,
+the PostgreSQL publication will be automatically deleted when the `Publication`
+object is removed.
+
+### Example: Subscription with Delete Reclaim Policy
+
+The following example illustrates a `Subscription` object with a `delete`
+reclaim policy:
+
+```yaml
+apiVersion: postgresql.cnpg.io/v1
+kind: Subscription
+metadata:
+ name: sub-one
+spec:
+ name: sub
+ dbname: cat
+ publicationName: pub
+ subscriptionReclaimPolicy: delete
+ cluster:
+ name: destination-cluster
+ externalClusterName: source-cluster
+```
+
+In this case, when the `Subscription` object is deleted, the corresponding PostgreSQL publication will also be removed automatically.
diff --git a/docs/src/e2e.md b/docs/src/e2e.md
index e796db13b6..de06101da5 100644
--- a/docs/src/e2e.md
+++ b/docs/src/e2e.md
@@ -60,6 +60,7 @@ and the following suite of E2E tests are performed on that cluster:
* Replication Slots
* Synchronous replication
* Scale-up and scale-down of a Cluster
+ * Logical replication via declarative Publication / Subscription
* **Replica clusters**
* Bootstrapping a replica cluster from backup
diff --git a/docs/src/samples/cluster-example-logical-destination.yaml b/docs/src/samples/cluster-example-logical-destination.yaml
index 75cb3f2af2..b119c9a4c8 100644
--- a/docs/src/samples/cluster-example-logical-destination.yaml
+++ b/docs/src/samples/cluster-example-logical-destination.yaml
@@ -31,3 +31,15 @@ spec:
password:
name: cluster-example-superuser
key: password
+---
+apiVersion: postgresql.cnpg.io/v1
+kind: Subscription
+metadata:
+ name: cluster-example-dest-sub
+spec:
+ name: sub
+ dbname: app
+ publicationName: pub
+ cluster:
+ name: cluster-example-dest
+ externalClusterName: cluster-example
diff --git a/docs/src/samples/cluster-example-logical-source.yaml b/docs/src/samples/cluster-example-logical-source.yaml
index ad9f888353..991c99448d 100644
--- a/docs/src/samples/cluster-example-logical-source.yaml
+++ b/docs/src/samples/cluster-example-logical-source.yaml
@@ -30,3 +30,15 @@ spec:
- name: app
login: true
replication: true
+---
+apiVersion: postgresql.cnpg.io/v1
+kind: Publication
+metadata:
+ name: cluster-example-pub
+spec:
+ name: pub
+ dbname: app
+ cluster:
+ name: cluster-example
+ target:
+ allTables: true
diff --git a/docs/src/samples/publication-example.yaml b/docs/src/samples/publication-example.yaml
new file mode 100644
index 0000000000..adb9ee4fcd
--- /dev/null
+++ b/docs/src/samples/publication-example.yaml
@@ -0,0 +1,11 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Publication
+metadata:
+ name: publication-sample
+spec:
+ name: pub
+ dbname: app
+ cluster:
+ name: cluster-example
+ target:
+ allTables: true
diff --git a/docs/src/samples/subscription-example.yaml b/docs/src/samples/subscription-example.yaml
new file mode 100644
index 0000000000..126a28ff5f
--- /dev/null
+++ b/docs/src/samples/subscription-example.yaml
@@ -0,0 +1,11 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Subscription
+metadata:
+ name: subscription-sample
+spec:
+ name: sub
+ dbname: app
+ publicationName: pub
+ cluster:
+ name: cluster-example-dest
+ externalClusterName: cluster-example
diff --git a/go.mod b/go.mod
index d0ca1763c6..b519ff235d 100644
--- a/go.mod
+++ b/go.mod
@@ -11,7 +11,7 @@ require (
github.com/blang/semver v3.5.1+incompatible
github.com/cheynewallace/tabby v1.1.1
github.com/cloudnative-pg/barman-cloud v0.0.0-20241016085606-44f56f711a5c
- github.com/cloudnative-pg/cnpg-i v0.0.0-20241001103001-7e24b2eccd50
+ github.com/cloudnative-pg/cnpg-i v0.0.0-20241016132832-8d61352831c6
github.com/cloudnative-pg/machinery v0.0.0-20241014090714-c27747f9974b
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/evanphx/json-patch/v5 v5.9.0
@@ -114,7 +114,7 @@ require (
golang.org/x/tools v0.25.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
- google.golang.org/protobuf v1.34.2 // indirect
+ google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
diff --git a/go.sum b/go.sum
index 19a21dfdaa..9be832577f 100644
--- a/go.sum
+++ b/go.sum
@@ -20,8 +20,8 @@ github.com/cheynewallace/tabby v1.1.1 h1:JvUR8waht4Y0S3JF17G6Vhyt+FRhnqVCkk8l4Yr
github.com/cheynewallace/tabby v1.1.1/go.mod h1:Pba/6cUL8uYqvOc9RkyvFbHGrQ9wShyrn6/S/1OYVys=
github.com/cloudnative-pg/barman-cloud v0.0.0-20241016085606-44f56f711a5c h1:JQK5GOXSukWTInG5GzgmlTwY/rs5yO446+xy09NqbLg=
github.com/cloudnative-pg/barman-cloud v0.0.0-20241016085606-44f56f711a5c/go.mod h1:Jm0tOp5oB7utpt8wz6RfSv31h1mThOtffjfyxVupriE=
-github.com/cloudnative-pg/cnpg-i v0.0.0-20241001103001-7e24b2eccd50 h1:Rm/bbC0GNCuWth5fHVMos99RzNczbWRVBdjubh3JMPs=
-github.com/cloudnative-pg/cnpg-i v0.0.0-20241001103001-7e24b2eccd50/go.mod h1:lTWPq8pluS0PSnRMwt0zShftbyssoRhTJ5zAip8unl8=
+github.com/cloudnative-pg/cnpg-i v0.0.0-20241016132832-8d61352831c6 h1:QokKbYfQ0sRWMHDB0sVUL1H/kGQki+AXBfBRp7J+9Og=
+github.com/cloudnative-pg/cnpg-i v0.0.0-20241016132832-8d61352831c6/go.mod h1:fAU7ySVzjpt/RZntxWZiWJCjaBJayzIxEnd0NuO7oQc=
github.com/cloudnative-pg/machinery v0.0.0-20241014090714-c27747f9974b h1:4Q2VQsPlLHliJdi87zodQ0FHLd1cJINMm4N70eu8rRg=
github.com/cloudnative-pg/machinery v0.0.0-20241014090714-c27747f9974b/go.mod h1:+mUFdys1IX+qwQUrV+/i56Tey/mYh8ZzWZYttwivRns=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
@@ -262,8 +262,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
-google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
-google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
+google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
+google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
diff --git a/internal/cmd/manager/instance/run/cmd.go b/internal/cmd/manager/instance/run/cmd.go
index 066360ddaa..e02d06d55c 100644
--- a/internal/cmd/manager/instance/run/cmd.go
+++ b/internal/cmd/manager/instance/run/cmd.go
@@ -165,6 +165,16 @@ func runSubCommand(ctx context.Context, instance *postgres.Instance) error {
instance.GetNamespaceName(): {},
},
},
+ &apiv1.Publication{}: {
+ Namespaces: map[string]cache.Config{
+ instance.GetNamespaceName(): {},
+ },
+ },
+ &apiv1.Subscription{}: {
+ Namespaces: map[string]cache.Config{
+ instance.GetNamespaceName(): {},
+ },
+ },
},
},
// We don't need a cache for secrets and configmap, as all reloads
@@ -215,6 +225,20 @@ func runSubCommand(ctx context.Context, instance *postgres.Instance) error {
return err
}
+ // database publication reconciler
+ publicationReconciler := controller.NewPublicationReconciler(mgr, instance)
+ if err := publicationReconciler.SetupWithManager(mgr); err != nil {
+ contextLogger.Error(err, "unable to create publication controller")
+ return err
+ }
+
+ // database subscription reconciler
+ subscriptionReconciler := controller.NewSubscriptionReconciler(mgr, instance)
+ if err := subscriptionReconciler.SetupWithManager(mgr); err != nil {
+ contextLogger.Error(err, "unable to create subscription controller")
+ return err
+ }
+
// postgres CSV logs handler (PGAudit too)
postgresLogPipe := logpipe.NewLogPipe()
if err := mgr.Add(postgresLogPipe); err != nil {
diff --git a/internal/management/controller/common.go b/internal/management/controller/common.go
new file mode 100644
index 0000000000..7f08ba2c3f
--- /dev/null
+++ b/internal/management/controller/common.go
@@ -0,0 +1,98 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "sort"
+
+ "github.com/lib/pq"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
+)
+
+type markableAsFailed interface {
+ client.Object
+ SetAsFailed(err error)
+}
+
+// markAsFailed marks the reconciliation as failed and logs the corresponding error
+func markAsFailed(
+ ctx context.Context,
+ cli client.Client,
+ resource markableAsFailed,
+ err error,
+) error {
+ oldResource := resource.DeepCopyObject().(markableAsFailed)
+ resource.SetAsFailed(err)
+ return cli.Status().Patch(ctx, resource, client.MergeFrom(oldResource))
+}
+
+type markableAsReady interface {
+ client.Object
+ SetAsReady()
+}
+
+// markAsReady marks the reconciliation as succeeded inside the resource
+func markAsReady(
+ ctx context.Context,
+ cli client.Client,
+ resource markableAsReady,
+) error {
+ oldResource := resource.DeepCopyObject().(markableAsReady)
+ resource.SetAsReady()
+
+ return cli.Status().Patch(ctx, resource, client.MergeFrom(oldResource))
+}
+
+func getClusterFromInstance(
+ ctx context.Context,
+ cli client.Client,
+ instance instanceInterface,
+) (*apiv1.Cluster, error) {
+ var cluster apiv1.Cluster
+ err := cli.Get(ctx, types.NamespacedName{
+ Name: instance.GetClusterName(),
+ Namespace: instance.GetNamespaceName(),
+ }, &cluster)
+ return &cluster, err
+}
+
+func toPostgresParameters(parameters map[string]string) string {
+ // create slice and store keys
+ keys := make([]string, 0, len(parameters))
+ for k := range parameters {
+ keys = append(keys, k)
+ }
+
+ // sort the slice by keys
+ sort.Strings(keys)
+
+ b := new(bytes.Buffer)
+ for _, key := range keys {
+ // TODO(armru): should we sanitize the key?
+ // TODO(armru): any alternative to pg.QuoteLiteral?
+ _, _ = fmt.Fprintf(b, "%s = %s, ", key, pq.QuoteLiteral(parameters[key]))
+ }
+
+ // pruning last 2 chars `, `
+ return b.String()[:len(b.String())-2]
+}
diff --git a/internal/management/controller/database_controller.go b/internal/management/controller/database_controller.go
index d2c4256bc7..a5295bdbc2 100644
--- a/internal/management/controller/database_controller.go
+++ b/internal/management/controller/database_controller.go
@@ -26,7 +26,6 @@ import (
"github.com/cloudnative-pg/machinery/pkg/log"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@@ -292,18 +291,7 @@ func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
// GetCluster gets the managed cluster through the client
func (r *DatabaseReconciler) GetCluster(ctx context.Context) (*apiv1.Cluster, error) {
- var cluster apiv1.Cluster
- err := r.Client.Get(ctx,
- types.NamespacedName{
- Namespace: r.instance.GetNamespaceName(),
- Name: r.instance.GetClusterName(),
- },
- &cluster)
- if err != nil {
- return nil, err
- }
-
- return &cluster, nil
+ return getClusterFromInstance(ctx, r.Client, r.instance)
}
func (r *DatabaseReconciler) reconcileDatabase(ctx context.Context, obj *apiv1.Database) error {
diff --git a/internal/management/controller/finalizers.go b/internal/management/controller/finalizers.go
new file mode 100644
index 0000000000..ed334d16fb
--- /dev/null
+++ b/internal/management/controller/finalizers.go
@@ -0,0 +1,49 @@
+package controller
+
+import (
+ "context"
+
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+)
+
+type finalizerReconciler[T client.Object] struct {
+ cli client.Client
+ finalizerName string
+ onRemoveFunc func(ctx context.Context, resource T) error
+}
+
+func newFinalizerReconciler[T client.Object](
+ cli client.Client,
+ finalizerName string,
+ onRemoveFunc func(ctx context.Context, resource T) error,
+) *finalizerReconciler[T] {
+ return &finalizerReconciler[T]{
+ cli: cli,
+ finalizerName: finalizerName,
+ onRemoveFunc: onRemoveFunc,
+ }
+}
+
+func (f finalizerReconciler[T]) reconcile(ctx context.Context, resource T) error {
+ // add finalizer in non-deleted publications if not present
+ if resource.GetDeletionTimestamp().IsZero() {
+ if !controllerutil.AddFinalizer(resource, f.finalizerName) {
+ return nil
+ }
+ return f.cli.Update(ctx, resource)
+ }
+
+ // the publication is being deleted but no finalizer is present, we can quit
+ if !controllerutil.ContainsFinalizer(resource, f.finalizerName) {
+ return nil
+ }
+
+ if err := f.onRemoveFunc(ctx, resource); err != nil {
+ return err
+ }
+
+ // remove our finalizer from the list and update it.
+ controllerutil.RemoveFinalizer(resource, f.finalizerName)
+ return f.cli.Update(ctx, resource)
+}
diff --git a/internal/management/controller/manager.go b/internal/management/controller/manager.go
index b1c01130d7..426f85fd14 100644
--- a/internal/management/controller/manager.go
+++ b/internal/management/controller/manager.go
@@ -82,18 +82,7 @@ func (r *InstanceReconciler) Instance() *postgres.Instance {
// GetCluster gets the managed cluster through the client
func (r *InstanceReconciler) GetCluster(ctx context.Context) (*apiv1.Cluster, error) {
- var cluster apiv1.Cluster
- err := r.GetClient().Get(ctx,
- types.NamespacedName{
- Namespace: r.instance.GetNamespaceName(),
- Name: r.instance.GetClusterName(),
- },
- &cluster)
- if err != nil {
- return nil, err
- }
-
- return &cluster, nil
+ return getClusterFromInstance(ctx, r.client, r.instance)
}
// GetSecret will get a named secret in the instance namespace
diff --git a/internal/management/controller/publication_controller.go b/internal/management/controller/publication_controller.go
new file mode 100644
index 0000000000..bed200d923
--- /dev/null
+++ b/internal/management/controller/publication_controller.go
@@ -0,0 +1,168 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/cloudnative-pg/machinery/pkg/log"
+ "k8s.io/apimachinery/pkg/runtime"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+
+ apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
+ "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres"
+ "github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
+)
+
+// PublicationReconciler reconciles a Publication object
+type PublicationReconciler struct {
+ client.Client
+ Scheme *runtime.Scheme
+
+ instance *postgres.Instance
+ finalizerReconciler *finalizerReconciler[*apiv1.Publication]
+}
+
+// publicationReconciliationInterval is the time between the
+// publication reconciliation loop failures
+const publicationReconciliationInterval = 30 * time.Second
+
+// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=publications,verbs=get;list;watch;create;update;patch;delete
+// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=publications/status,verbs=get;update;patch
+
+// Reconcile is part of the main kubernetes reconciliation loop which aims to
+// move the current state of the cluster closer to the desired state.
+// TODO(user): Modify the Reconcile function to compare the state specified by
+// the Publication object against the actual cluster state, and then
+// perform operations to make the cluster state reflect the state specified by
+// the user.
+//
+// For more details, check Reconcile and its Result here:
+// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.4/pkg/reconcile
+func (r *PublicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
+ contextLogger, ctx := log.SetupLogger(ctx)
+
+ contextLogger.Debug("Reconciliation loop start")
+ defer func() {
+ contextLogger.Debug("Reconciliation loop end")
+ }()
+
+ // Get the publication object
+ var publication apiv1.Publication
+ if err := r.Client.Get(ctx, client.ObjectKey{
+ Namespace: req.Namespace,
+ Name: req.Name,
+ }, &publication); err != nil {
+ contextLogger.Trace("Could not fetch Publication", "error", err)
+ return ctrl.Result{}, client.IgnoreNotFound(err)
+ }
+
+ // This is not for me!
+ if publication.Spec.ClusterRef.Name != r.instance.GetClusterName() {
+ contextLogger.Trace("Publication is not for this cluster",
+ "cluster", publication.Spec.ClusterRef.Name,
+ "expected", r.instance.GetClusterName(),
+ )
+ return ctrl.Result{}, nil
+ }
+
+ // Fetch the Cluster from the cache
+ cluster, err := r.GetCluster(ctx)
+ if err != nil {
+ return ctrl.Result{}, markAsFailed(ctx, r.Client, &publication, fmt.Errorf("while fetching the cluster: %w", err))
+ }
+
+ // Still not for me, we're waiting for a switchover
+ if cluster.Status.CurrentPrimary != cluster.Status.TargetPrimary {
+ return ctrl.Result{RequeueAfter: publicationReconciliationInterval}, nil
+ }
+
+ // This is not for me, at least now
+ if cluster.Status.CurrentPrimary != r.instance.GetPodName() {
+ return ctrl.Result{RequeueAfter: publicationReconciliationInterval}, nil
+ }
+
+ // Cannot do anything on a replica cluster
+ if cluster.IsReplica() {
+ markErr := markAsFailed(ctx, r.Client, &publication, errClusterIsReplica)
+ return ctrl.Result{RequeueAfter: publicationReconciliationInterval}, markErr
+ }
+
+ if err := r.finalizerReconciler.reconcile(ctx, &publication); err != nil {
+ return ctrl.Result{RequeueAfter: publicationReconciliationInterval},
+ fmt.Errorf("while reconciling the finalizer: %w", err)
+ }
+
+ // If everything is reconciled, we're done here
+ if publication.Generation == publication.Status.ObservedGeneration {
+ return ctrl.Result{}, nil
+ }
+
+ if err := r.alignPublication(ctx, &publication); err != nil {
+ return ctrl.Result{RequeueAfter: publicationReconciliationInterval}, markAsFailed(ctx, r.Client, &publication, err)
+ }
+
+ return ctrl.Result{RequeueAfter: publicationReconciliationInterval}, markAsReady(ctx, r.Client, &publication)
+}
+
+func (r *PublicationReconciler) evaluateDropPublication(ctx context.Context, pub *apiv1.Publication) error {
+ if pub.Spec.ReclaimPolicy != apiv1.PublicationReclaimDelete {
+ return nil
+ }
+ db, err := r.instance.ConnectionPool().Connection(pub.Spec.DBName)
+ if err != nil {
+ return fmt.Errorf("while getting DB connection: %w", err)
+ }
+
+ return executeDropPublication(ctx, db, pub.Spec.Name)
+}
+
+// NewPublicationReconciler creates a new publication reconciler
+func NewPublicationReconciler(
+ mgr manager.Manager,
+ instance *postgres.Instance,
+) *PublicationReconciler {
+ pr := &PublicationReconciler{
+ Client: mgr.GetClient(),
+ instance: instance,
+ }
+
+ pr.finalizerReconciler = newFinalizerReconciler(
+ mgr.GetClient(),
+ utils.PublicationFinalizerName,
+ pr.evaluateDropPublication,
+ )
+
+ return pr
+}
+
+// SetupWithManager sets up the controller with the Manager.
+func (r *PublicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
+ return ctrl.NewControllerManagedBy(mgr).
+ For(&apiv1.Publication{}).
+ Named("instance-publication").
+ Complete(r)
+}
+
+// GetCluster gets the managed cluster through the client
+func (r *PublicationReconciler) GetCluster(ctx context.Context) (*apiv1.Cluster, error) {
+ return getClusterFromInstance(ctx, r.Client, r.instance)
+}
diff --git a/internal/management/controller/publication_controller_sql.go b/internal/management/controller/publication_controller_sql.go
new file mode 100644
index 0000000000..47e3e4e467
--- /dev/null
+++ b/internal/management/controller/publication_controller_sql.go
@@ -0,0 +1,198 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "strings"
+
+ "github.com/jackc/pgx/v5"
+
+ apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
+)
+
+func (r *PublicationReconciler) alignPublication(ctx context.Context, obj *apiv1.Publication) error {
+ db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName)
+ if err != nil {
+ return fmt.Errorf("while getting DB connection: %w", err)
+ }
+
+ row := db.QueryRowContext(
+ ctx,
+ `
+ SELECT count(*)
+ FROM pg_publication
+ WHERE pubname = $1
+ `,
+ obj.Spec.Name)
+ if row.Err() != nil {
+ return fmt.Errorf("while getting publication status: %w", row.Err())
+ }
+
+ var count int
+ if err := row.Scan(&count); err != nil {
+ return fmt.Errorf("while getting publication status (scan): %w", err)
+ }
+
+ if count > 0 {
+ if err := r.patchPublication(ctx, db, obj); err != nil {
+ return fmt.Errorf("while patching publication: %w", err)
+ }
+ return nil
+ }
+
+ if err := r.createPublication(ctx, db, obj); err != nil {
+ return fmt.Errorf("while creating publication: %w", err)
+ }
+
+ return nil
+}
+
+func (r *PublicationReconciler) patchPublication(
+ ctx context.Context,
+ db *sql.DB,
+ obj *apiv1.Publication,
+) error {
+ sqls := toPublicationAlterSQL(obj)
+ for _, sqlQuery := range sqls {
+ if _, err := db.ExecContext(ctx, sqlQuery); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (r *PublicationReconciler) createPublication(
+ ctx context.Context,
+ db *sql.DB,
+ obj *apiv1.Publication,
+) error {
+ sqls := toPublicationCreateSQL(obj)
+ for _, sqlQuery := range sqls {
+ if _, err := db.ExecContext(ctx, sqlQuery); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func toPublicationCreateSQL(obj *apiv1.Publication) []string {
+ result := make([]string, 0, 2)
+
+ createQuery := fmt.Sprintf(
+ "CREATE PUBLICATION %s %s",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ toPublicationTargetSQL(&obj.Spec.Target),
+ )
+ if len(obj.Spec.Parameters) > 0 {
+ createQuery = fmt.Sprintf("%s WITH (%s)", createQuery, toPostgresParameters(obj.Spec.Parameters))
+ }
+ result = append(result, createQuery)
+
+ if len(obj.Spec.Owner) > 0 {
+ result = append(result,
+ fmt.Sprintf(
+ "ALTER PUBLICATION %s OWNER to %s",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ pgx.Identifier{obj.Spec.Owner}.Sanitize(),
+ ),
+ )
+ }
+
+ return result
+}
+
+func toPublicationAlterSQL(obj *apiv1.Publication) []string {
+ result := make([]string, 0, 3)
+
+ if len(obj.Spec.Target.Objects) > 0 {
+ result = append(result,
+ fmt.Sprintf(
+ "ALTER PUBLICATION %s SET %s",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ toPublicationTargetObjectsSQL(&obj.Spec.Target),
+ ),
+ )
+ }
+
+ if len(obj.Spec.Owner) > 0 {
+ result = append(result,
+ fmt.Sprintf(
+ "ALTER PUBLICATION %s OWNER TO %s",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ pgx.Identifier{obj.Spec.Owner}.Sanitize(),
+ ),
+ )
+ }
+
+ if len(obj.Spec.Parameters) > 0 {
+ result = append(result,
+ fmt.Sprintf(
+ "ALTER PUBLICATION %s SET (%s)",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ toPostgresParameters(obj.Spec.Parameters),
+ ),
+ )
+ }
+
+ return result
+}
+
+func executeDropPublication(ctx context.Context, db *sql.DB, name string) error {
+ if _, err := db.ExecContext(
+ ctx,
+ fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{name}.Sanitize()),
+ ); err != nil {
+ return fmt.Errorf("while dropping publication: %w", err)
+ }
+
+ return nil
+}
+
+func toPublicationTargetSQL(obj *apiv1.PublicationTarget) string {
+ if obj.AllTables {
+ return "FOR ALL TABLES"
+ }
+
+ return toPublicationTargetObjectsSQL(obj)
+}
+
+func toPublicationTargetObjectsSQL(obj *apiv1.PublicationTarget) string {
+ result := ""
+ for _, object := range obj.Objects {
+ if len(result) > 0 {
+ result += ", "
+ }
+ result += toPublicationObjectSQL(&object)
+ }
+
+ if len(result) > 0 {
+ result = fmt.Sprintf("FOR %s", result)
+ }
+ return result
+}
+
+func toPublicationObjectSQL(obj *apiv1.PublicationTargetObject) string {
+ if len(obj.Schema) > 0 {
+ return fmt.Sprintf("TABLES IN SCHEMA %s", pgx.Identifier{obj.Schema}.Sanitize())
+ }
+
+ return fmt.Sprintf("TABLE %s", strings.Join(obj.TableExpression, ", "))
+}
diff --git a/internal/management/controller/publication_controller_sql_test.go b/internal/management/controller/publication_controller_sql_test.go
new file mode 100644
index 0000000000..ce2142a796
--- /dev/null
+++ b/internal/management/controller/publication_controller_sql_test.go
@@ -0,0 +1,179 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// nolint: dupl
+package controller
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/DATA-DOG/go-sqlmock"
+ "github.com/jackc/pgx/v5"
+
+ apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("publication sql", func() {
+ var (
+ dbMock sqlmock.Sqlmock
+ db *sql.DB
+ )
+
+ BeforeEach(func() {
+ var err error
+ db, dbMock, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ AfterEach(func() {
+ Expect(dbMock.ExpectationsWereMet()).To(Succeed())
+ })
+
+ It("drops the publication successfully", func(ctx SpecContext) {
+ dbMock.ExpectExec(fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{"publication_name"}.Sanitize())).
+ WillReturnResult(sqlmock.NewResult(1, 1))
+
+ err := executeDropPublication(ctx, db, "publication_name")
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ It("returns an error when dropping the publication fails", func(ctx SpecContext) {
+ dbMock.ExpectExec(fmt.Sprintf("DROP PUBLICATION IF EXISTS %s",
+ pgx.Identifier{"publication_name"}.Sanitize())).
+ WillReturnError(fmt.Errorf("drop publication error"))
+
+ err := executeDropPublication(ctx, db, "publication_name")
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("while dropping publication: drop publication error"))
+ })
+
+ It("sanitizes the publication name correctly", func(ctx SpecContext) {
+ dbMock.ExpectExec(
+ fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{"sanitized_name"}.Sanitize())).
+ WillReturnResult(sqlmock.NewResult(1, 1))
+
+ err := executeDropPublication(ctx, db, "sanitized_name")
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ It("generates correct SQL for altering publication with target objects", func() {
+ obj := &apiv1.Publication{
+ Spec: apiv1.PublicationSpec{
+ Name: "test_pub",
+ Target: apiv1.PublicationTarget{
+ Objects: []apiv1.PublicationTargetObject{
+ {Schema: "public"},
+ },
+ },
+ },
+ }
+
+ sqls := toPublicationAlterSQL(obj)
+ Expect(sqls).To(ContainElement("ALTER PUBLICATION \"test_pub\" SET FOR TABLES IN SCHEMA \"public\""))
+ })
+
+ It("generates correct SQL for altering publication with owner", func() {
+ obj := &apiv1.Publication{
+ Spec: apiv1.PublicationSpec{
+ Name: "test_pub",
+ Owner: "new_owner",
+ },
+ }
+
+ sqls := toPublicationAlterSQL(obj)
+ Expect(sqls).To(ContainElement("ALTER PUBLICATION \"test_pub\" OWNER TO \"new_owner\""))
+ })
+
+ It("generates correct SQL for altering publication with parameters", func() {
+ obj := &apiv1.Publication{
+ Spec: apiv1.PublicationSpec{
+ Name: "test_pub",
+ Parameters: map[string]string{
+ "param1": "value1",
+ "param2": "value2",
+ },
+ },
+ }
+
+ sqls := toPublicationAlterSQL(obj)
+ Expect(sqls).To(ContainElement("ALTER PUBLICATION \"test_pub\" SET (param1 = 'value1', param2 = 'value2')"))
+ })
+
+ It("returns empty SQL list when no alterations are needed", func() {
+ obj := &apiv1.Publication{
+ Spec: apiv1.PublicationSpec{
+ Name: "test_pub",
+ },
+ }
+
+ sqls := toPublicationAlterSQL(obj)
+ Expect(sqls).To(BeEmpty())
+ })
+
+ It("generates correct SQL for creating publication with target objects", func() {
+ obj := &apiv1.Publication{
+ Spec: apiv1.PublicationSpec{
+ Name: "test_pub",
+ Target: apiv1.PublicationTarget{
+ Objects: []apiv1.PublicationTargetObject{
+ {Schema: "public"},
+ },
+ },
+ },
+ }
+
+ sqls := toPublicationCreateSQL(obj)
+ Expect(sqls).To(ContainElement("CREATE PUBLICATION \"test_pub\" FOR TABLES IN SCHEMA \"public\""))
+ })
+
+ It("generates correct SQL for creating publication with owner", func() {
+ obj := &apiv1.Publication{
+ Spec: apiv1.PublicationSpec{
+ Name: "test_pub",
+ Owner: "new_owner",
+ },
+ }
+
+ sqls := toPublicationCreateSQL(obj)
+ Expect(sqls).To(ContainElement("ALTER PUBLICATION \"test_pub\" OWNER to \"new_owner\""))
+ })
+
+ It("generates correct SQL for creating publication with parameters", func() {
+ obj := &apiv1.Publication{
+ Spec: apiv1.PublicationSpec{
+ Name: "test_pub",
+ Parameters: map[string]string{
+ "param1": "value1",
+ "param2": "value2",
+ },
+ Target: apiv1.PublicationTarget{
+ Objects: []apiv1.PublicationTargetObject{
+ {Schema: "public"},
+ },
+ },
+ },
+ }
+
+ sqls := toPublicationCreateSQL(obj)
+ Expect(sqls).To(ContainElement(
+ "CREATE PUBLICATION \"test_pub\" FOR TABLES IN SCHEMA \"public\" WITH (param1 = 'value1', param2 = 'value2')",
+ ))
+ })
+})
diff --git a/internal/management/controller/subscription_controller.go b/internal/management/controller/subscription_controller.go
new file mode 100644
index 0000000000..e6fcce8ded
--- /dev/null
+++ b/internal/management/controller/subscription_controller.go
@@ -0,0 +1,182 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/cloudnative-pg/machinery/pkg/log"
+ "k8s.io/apimachinery/pkg/runtime"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+
+ apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
+ "github.com/cloudnative-pg/cloudnative-pg/pkg/management/external"
+ "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres"
+ "github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
+)
+
+// SubscriptionReconciler reconciles a Subscription object
+type SubscriptionReconciler struct {
+ client.Client
+ Scheme *runtime.Scheme
+
+ instance *postgres.Instance
+ finalizerReconciler *finalizerReconciler[*apiv1.Subscription]
+}
+
+// subscriptionReconciliationInterval is the time between the
+// subscription reconciliation loop failures
+const subscriptionReconciliationInterval = 30 * time.Second
+
+// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=subscriptions,verbs=get;list;watch;create;update;patch;delete
+// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=subscriptions/status,verbs=get;update;patch
+
+// Reconcile is the subscription reconciliation loop
+func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
+ contextLogger, ctx := log.SetupLogger(ctx)
+
+ contextLogger.Debug("Reconciliation loop start")
+ defer func() {
+ contextLogger.Debug("Reconciliation loop end")
+ }()
+
+ // Get the subscription object
+ var subscription apiv1.Subscription
+ if err := r.Client.Get(ctx, client.ObjectKey{
+ Namespace: req.Namespace,
+ Name: req.Name,
+ }, &subscription); err != nil {
+ contextLogger.Trace("Could not fetch Subscription", "error", err)
+ return ctrl.Result{}, client.IgnoreNotFound(err)
+ }
+
+ // This is not for me!
+ if subscription.Spec.ClusterRef.Name != r.instance.GetClusterName() {
+ contextLogger.Trace("Subscription is not for this cluster",
+ "cluster", subscription.Spec.ClusterRef.Name,
+ "expected", r.instance.GetClusterName(),
+ )
+ return ctrl.Result{}, nil
+ }
+
+ // Fetch the Cluster from the cache
+ cluster, err := r.GetCluster(ctx)
+ if err != nil {
+ return ctrl.Result{}, markAsFailed(ctx, r.Client, &subscription, fmt.Errorf("while fetching the cluster: %w", err))
+ }
+
+ // Still not for me, we're waiting for a switchover
+ if cluster.Status.CurrentPrimary != cluster.Status.TargetPrimary {
+ return ctrl.Result{RequeueAfter: subscriptionReconciliationInterval}, nil
+ }
+
+ // This is not for me, at least now
+ if cluster.Status.CurrentPrimary != r.instance.GetPodName() {
+ return ctrl.Result{RequeueAfter: subscriptionReconciliationInterval}, nil
+ }
+
+ // Cannot do anything on a replica cluster
+ if cluster.IsReplica() {
+ err := markAsFailed(ctx, r.Client, &subscription, errClusterIsReplica)
+ return ctrl.Result{RequeueAfter: subscriptionReconciliationInterval}, err
+ }
+
+ if err := r.finalizerReconciler.reconcile(ctx, &subscription); err != nil {
+ return ctrl.Result{RequeueAfter: subscriptionReconciliationInterval},
+ fmt.Errorf("while reconciling the finalizer: %w", err)
+ }
+
+ // If everything is reconciled, we're done here
+ if subscription.Generation == subscription.Status.ObservedGeneration {
+ return ctrl.Result{}, nil
+ }
+
+ // Let's get the connection string
+ connString, err := getSubscriptionConnectionString(
+ cluster,
+ subscription.Spec.ExternalClusterName,
+ "", // TODO: should we have a way to force dbname?
+ )
+ if err != nil {
+ return ctrl.Result{RequeueAfter: subscriptionReconciliationInterval}, markAsFailed(ctx, r.Client, &subscription, err)
+ }
+
+ if err := r.alignSubscription(ctx, &subscription, connString); err != nil {
+ return ctrl.Result{RequeueAfter: subscriptionReconciliationInterval}, markAsFailed(ctx, r.Client, &subscription, err)
+ }
+
+ return ctrl.Result{RequeueAfter: subscriptionReconciliationInterval}, markAsReady(ctx, r.Client, &subscription)
+}
+
+func (r *SubscriptionReconciler) evaluateDropSubscription(ctx context.Context, sub *apiv1.Subscription) error {
+ if sub.Spec.ReclaimPolicy != apiv1.SubscriptionReclaimDelete {
+ return nil
+ }
+
+ db, err := r.instance.ConnectionPool().Connection(sub.Spec.DBName)
+ if err != nil {
+ return fmt.Errorf("while getting DB connection: %w", err)
+ }
+ return executeDropSubscription(ctx, db, sub.Spec.Name)
+}
+
+// NewSubscriptionReconciler creates a new subscription reconciler
+func NewSubscriptionReconciler(
+ mgr manager.Manager,
+ instance *postgres.Instance,
+) *SubscriptionReconciler {
+ sr := &SubscriptionReconciler{Client: mgr.GetClient(), instance: instance}
+ sr.finalizerReconciler = newFinalizerReconciler(
+ mgr.GetClient(),
+ utils.SubscriptionFinalizerName,
+ sr.evaluateDropSubscription,
+ )
+
+ return sr
+}
+
+// SetupWithManager sets up the controller with the Manager
+func (r *SubscriptionReconciler) SetupWithManager(mgr ctrl.Manager) error {
+ return ctrl.NewControllerManagedBy(mgr).
+ For(&apiv1.Subscription{}).
+ Named("instance-subscription").
+ Complete(r)
+}
+
+// GetCluster gets the managed cluster through the client
+func (r *SubscriptionReconciler) GetCluster(ctx context.Context) (*apiv1.Cluster, error) {
+ return getClusterFromInstance(ctx, r.Client, r.instance)
+}
+
+// getSubscriptionConnectionString gets the connection string to be used to connect to
+// the specified external cluster, while connected to a pod of the specified cluster
+func getSubscriptionConnectionString(
+ cluster *apiv1.Cluster,
+ externalClusterName string,
+ databaseName string,
+) (string, error) {
+ externalCluster, ok := cluster.ExternalCluster(externalClusterName)
+ if !ok {
+ return "", fmt.Errorf("externalCluster '%s' not declared in cluster %s", externalClusterName, cluster.Name)
+ }
+
+ return external.GetServerConnectionString(&externalCluster, databaseName), nil
+}
diff --git a/internal/management/controller/subscription_controller_sql.go b/internal/management/controller/subscription_controller_sql.go
new file mode 100644
index 0000000000..fb12099d93
--- /dev/null
+++ b/internal/management/controller/subscription_controller_sql.go
@@ -0,0 +1,178 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/lib/pq"
+
+ apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
+)
+
+func (r *SubscriptionReconciler) alignSubscription(
+ ctx context.Context,
+ obj *apiv1.Subscription,
+ connString string,
+) error {
+ db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName)
+ if err != nil {
+ return fmt.Errorf("while getting DB connection: %w", err)
+ }
+
+ row := db.QueryRowContext(
+ ctx,
+ `
+ SELECT count(*)
+ FROM pg_subscription
+ WHERE subname = $1
+ `,
+ obj.Spec.Name)
+ if row.Err() != nil {
+ return fmt.Errorf("while getting subscription status: %w", row.Err())
+ }
+
+ var count int
+ if err := row.Scan(&count); err != nil {
+ return fmt.Errorf("while getting subscription status (scan): %w", err)
+ }
+
+ if count > 0 {
+ if err := r.patchSubscription(ctx, db, obj, connString); err != nil {
+ return fmt.Errorf("while patching subscription: %w", err)
+ }
+ return nil
+ }
+
+ if err := r.createSubscription(ctx, db, obj, connString); err != nil {
+ return fmt.Errorf("while creating subscription: %w", err)
+ }
+
+ return nil
+}
+
+func (r *SubscriptionReconciler) patchSubscription(
+ ctx context.Context,
+ db *sql.DB,
+ obj *apiv1.Subscription,
+ connString string,
+) error {
+ sqls := toSubscriptionAlterSQL(obj, connString)
+ for _, sqlQuery := range sqls {
+ if _, err := db.ExecContext(ctx, sqlQuery); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (r *SubscriptionReconciler) createSubscription(
+ ctx context.Context,
+ db *sql.DB,
+ obj *apiv1.Subscription,
+ connString string,
+) error {
+ sqls := toSubscriptionCreateSQL(obj, connString)
+ for _, sqlQuery := range sqls {
+ if _, err := db.ExecContext(ctx, sqlQuery); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func toSubscriptionCreateSQL(obj *apiv1.Subscription, connString string) []string {
+ result := make([]string, 0, 2)
+
+ createQuery := fmt.Sprintf(
+ "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ pq.QuoteLiteral(connString),
+ pgx.Identifier{obj.Spec.PublicationName}.Sanitize(),
+ )
+ if len(obj.Spec.Parameters) > 0 {
+ createQuery = fmt.Sprintf("%s WITH (%s)", createQuery, toPostgresParameters(obj.Spec.Parameters))
+ }
+ result = append(result, createQuery)
+
+ if len(obj.Spec.Owner) > 0 {
+ result = append(result,
+ fmt.Sprintf(
+ "ALTER SUBSCRIPTION %s OWNER TO %s",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ pgx.Identifier{obj.Spec.Owner}.Sanitize(),
+ ),
+ )
+ }
+
+ return result
+}
+
+func toSubscriptionAlterSQL(obj *apiv1.Subscription, connString string) []string {
+ result := make([]string, 0, 4)
+
+ setPublicationSQL := fmt.Sprintf(
+ "ALTER SUBSCRIPTION %s SET PUBLICATION %s",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ pgx.Identifier{obj.Spec.PublicationName}.Sanitize(),
+ )
+
+ setConnStringSQL := fmt.Sprintf(
+ "ALTER SUBSCRIPTION %s CONNECTION %s",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ pq.QuoteLiteral(connString),
+ )
+ result = append(result, setPublicationSQL, setConnStringSQL)
+
+ if len(obj.Spec.Owner) > 0 {
+ result = append(result,
+ fmt.Sprintf(
+ "ALTER SUBSCRIPTION %s OWNER TO %s",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ pgx.Identifier{obj.Spec.Owner}.Sanitize(),
+ ),
+ )
+ }
+
+ if len(obj.Spec.Parameters) > 0 {
+ result = append(result,
+ fmt.Sprintf(
+ "ALTER SUBSCRIPTION %s SET (%s)",
+ pgx.Identifier{obj.Spec.Name}.Sanitize(),
+ toPostgresParameters(obj.Spec.Parameters),
+ ),
+ )
+ }
+
+ return result
+}
+
+func executeDropSubscription(ctx context.Context, db *sql.DB, name string) error {
+ if _, err := db.ExecContext(
+ ctx,
+ fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{name}.Sanitize()),
+ ); err != nil {
+ return fmt.Errorf("while dropping subscription: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/management/controller/subscription_controller_sql_test.go b/internal/management/controller/subscription_controller_sql_test.go
new file mode 100644
index 0000000000..5c40e114b3
--- /dev/null
+++ b/internal/management/controller/subscription_controller_sql_test.go
@@ -0,0 +1,201 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// nolint: dupl
+package controller
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/DATA-DOG/go-sqlmock"
+ "github.com/jackc/pgx/v5"
+
+ apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+// nolint: dupl
+var _ = Describe("subscription sql", func() {
+ var (
+ dbMock sqlmock.Sqlmock
+ db *sql.DB
+ )
+
+ BeforeEach(func() {
+ var err error
+ db, dbMock, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ AfterEach(func() {
+ Expect(dbMock.ExpectationsWereMet()).To(Succeed())
+ })
+
+ It("drops the subscription successfully", func(ctx SpecContext) {
+ dbMock.ExpectExec(fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{"subscription_name"}.Sanitize())).
+ WillReturnResult(sqlmock.NewResult(1, 1))
+
+ err := executeDropSubscription(ctx, db, "subscription_name")
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ It("returns an error when dropping the subscription fails", func(ctx SpecContext) {
+ dbMock.ExpectExec(fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{"subscription_name"}.Sanitize())).
+ WillReturnError(fmt.Errorf("drop subscription error"))
+
+ err := executeDropSubscription(ctx, db, "subscription_name")
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("while dropping subscription: drop subscription error"))
+ })
+
+ It("sanitizes the subscription name correctly", func(ctx SpecContext) {
+ dbMock.ExpectExec(fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{"sanitized_name"}.Sanitize())).
+ WillReturnResult(sqlmock.NewResult(1, 1))
+
+ err := executeDropSubscription(ctx, db, "sanitized_name")
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ It("generates correct SQL for creating subscription with publication and connection string", func() {
+ obj := &apiv1.Subscription{
+ Spec: apiv1.SubscriptionSpec{
+ Name: "test_sub",
+ PublicationName: "test_pub",
+ },
+ }
+ connString := "host=localhost user=test dbname=test"
+
+ sqls := toSubscriptionCreateSQL(obj, connString)
+ Expect(sqls).To(ContainElement(
+ "CREATE SUBSCRIPTION \"test_sub\" CONNECTION 'host=localhost user=test dbname=test' PUBLICATION \"test_pub\""))
+ })
+
+ It("generates correct SQL for creating subscription with parameters", func() {
+ obj := &apiv1.Subscription{
+ Spec: apiv1.SubscriptionSpec{
+ Name: "test_sub",
+ PublicationName: "test_pub",
+ Parameters: map[string]string{
+ "param1": "value1",
+ "param2": "value2",
+ },
+ },
+ }
+ connString := "host=localhost user=test dbname=test"
+
+ sqls := toSubscriptionCreateSQL(obj, connString)
+ expectedElement := "CREATE SUBSCRIPTION \"test_sub\" " +
+ "CONNECTION 'host=localhost user=test dbname=test' " +
+ "PUBLICATION \"test_pub\" WITH (param1 = 'value1', param2 = 'value2')"
+ Expect(sqls).To(ContainElement(expectedElement))
+ })
+
+ It("generates correct SQL for creating subscription with owner", func() {
+ obj := &apiv1.Subscription{
+ Spec: apiv1.SubscriptionSpec{
+ Name: "test_sub",
+ PublicationName: "test_pub",
+ Owner: "new_owner",
+ },
+ }
+ connString := "host=localhost user=test dbname=test"
+
+ sqls := toSubscriptionCreateSQL(obj, connString)
+ Expect(sqls).To(ContainElement(
+ "CREATE SUBSCRIPTION \"test_sub\" CONNECTION 'host=localhost user=test dbname=test' PUBLICATION \"test_pub\""))
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" OWNER TO \"new_owner\""))
+ })
+
+ It("returns correct SQL for creating subscription with no owner or parameters", func() {
+ obj := &apiv1.Subscription{
+ Spec: apiv1.SubscriptionSpec{
+ Name: "test_sub",
+ PublicationName: "test_pub",
+ },
+ }
+ connString := "host=localhost user=test dbname=test"
+
+ sqls := toSubscriptionCreateSQL(obj, connString)
+ Expect(sqls).To(ContainElement(
+ "CREATE SUBSCRIPTION \"test_sub\" CONNECTION 'host=localhost user=test dbname=test' PUBLICATION \"test_pub\""))
+ })
+
+ It("generates correct SQL for altering subscription with publication and connection string", func() {
+ obj := &apiv1.Subscription{
+ Spec: apiv1.SubscriptionSpec{
+ Name: "test_sub",
+ PublicationName: "test_pub",
+ },
+ }
+ connString := "host=localhost user=test dbname=test"
+
+ sqls := toSubscriptionAlterSQL(obj, connString)
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" SET PUBLICATION \"test_pub\""))
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" CONNECTION 'host=localhost user=test dbname=test'"))
+ })
+
+ It("generates correct SQL for altering subscription with owner", func() {
+ obj := &apiv1.Subscription{
+ Spec: apiv1.SubscriptionSpec{
+ Name: "test_sub",
+ PublicationName: "test_pub",
+ Owner: "new_owner",
+ },
+ }
+ connString := "host=localhost user=test dbname=test"
+
+ sqls := toSubscriptionAlterSQL(obj, connString)
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" SET PUBLICATION \"test_pub\""))
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" CONNECTION 'host=localhost user=test dbname=test'"))
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" OWNER TO \"new_owner\""))
+ })
+
+ It("generates correct SQL for altering subscription with parameters", func() {
+ obj := &apiv1.Subscription{
+ Spec: apiv1.SubscriptionSpec{
+ Name: "test_sub",
+ PublicationName: "test_pub",
+ Parameters: map[string]string{
+ "param1": "value1",
+ "param2": "value2",
+ },
+ },
+ }
+ connString := "host=localhost user=test dbname=test"
+
+ sqls := toSubscriptionAlterSQL(obj, connString)
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" SET PUBLICATION \"test_pub\""))
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" CONNECTION 'host=localhost user=test dbname=test'"))
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" SET (param1 = 'value1', param2 = 'value2')"))
+ })
+
+ It("returns correct SQL for altering subscription with no owner or parameters", func() {
+ obj := &apiv1.Subscription{
+ Spec: apiv1.SubscriptionSpec{
+ Name: "test_sub",
+ PublicationName: "test_pub",
+ },
+ }
+ connString := "host=localhost user=test dbname=test"
+
+ sqls := toSubscriptionAlterSQL(obj, connString)
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" SET PUBLICATION \"test_pub\""))
+ Expect(sqls).To(ContainElement("ALTER SUBSCRIPTION \"test_sub\" CONNECTION 'host=localhost user=test dbname=test'"))
+ })
+})
diff --git a/internal/management/controller/subscription_controller_test.go b/internal/management/controller/subscription_controller_test.go
new file mode 100644
index 0000000000..230dc496ea
--- /dev/null
+++ b/internal/management/controller/subscription_controller_test.go
@@ -0,0 +1,35 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("Conversion of PG parameters from map to string of key/value pairs", func() {
+ It("returns expected well-formed list", func() {
+ m := map[string]string{
+ "a": "1", "b": "2",
+ }
+ res := toPostgresParameters(m)
+ Expect(res).To(BeElementOf([]string{
+ `a = '1', b = '2'`,
+ `b = '2', a = '1'`,
+ }))
+ })
+})
diff --git a/pkg/specs/roles.go b/pkg/specs/roles.go
index f0d9bf4cb1..ac328cc66e 100644
--- a/pkg/specs/roles.go
+++ b/pkg/specs/roles.go
@@ -154,6 +154,62 @@ func CreateRole(cluster apiv1.Cluster, backupOrigin *apiv1.Backup) rbacv1.Role {
"update",
},
},
+ {
+ APIGroups: []string{
+ "postgresql.cnpg.io",
+ },
+ Resources: []string{
+ "publications",
+ },
+ Verbs: []string{
+ "get",
+ "update",
+ "list",
+ "watch",
+ },
+ ResourceNames: []string{},
+ },
+ {
+ APIGroups: []string{
+ "postgresql.cnpg.io",
+ },
+ Resources: []string{
+ "publications/status",
+ },
+ Verbs: []string{
+ "get",
+ "patch",
+ "update",
+ },
+ },
+ {
+ APIGroups: []string{
+ "postgresql.cnpg.io",
+ },
+ Resources: []string{
+ "subscriptions",
+ },
+ Verbs: []string{
+ "get",
+ "update",
+ "list",
+ "watch",
+ },
+ ResourceNames: []string{},
+ },
+ {
+ APIGroups: []string{
+ "postgresql.cnpg.io",
+ },
+ Resources: []string{
+ "subscriptions/status",
+ },
+ Verbs: []string{
+ "get",
+ "patch",
+ "update",
+ },
+ },
}
return rbacv1.Role{
diff --git a/pkg/specs/roles_test.go b/pkg/specs/roles_test.go
index 3753a66154..0d3df97d28 100644
--- a/pkg/specs/roles_test.go
+++ b/pkg/specs/roles_test.go
@@ -165,7 +165,7 @@ var _ = Describe("Roles", func() {
serviceAccount := CreateRole(cluster, nil)
Expect(serviceAccount.Name).To(Equal(cluster.Name))
Expect(serviceAccount.Namespace).To(Equal(cluster.Namespace))
- Expect(serviceAccount.Rules).To(HaveLen(9))
+ Expect(serviceAccount.Rules).To(HaveLen(13))
})
It("should contain every secret of the origin backup and backup configuration of every external cluster", func() {
diff --git a/pkg/utils/finalizers.go b/pkg/utils/finalizers.go
index 81d958df6d..ba9ed64f16 100644
--- a/pkg/utils/finalizers.go
+++ b/pkg/utils/finalizers.go
@@ -20,4 +20,12 @@ const (
// DatabaseFinalizerName is the name of the finalizer
// triggering the deletion of the database
DatabaseFinalizerName = MetadataNamespace + "/deleteDatabase"
+
+ // PublicationFinalizerName is the name of the finalizer
+ // triggering the deletion of the publication
+ PublicationFinalizerName = MetadataNamespace + "/deletePublication"
+
+ // SubscriptionFinalizerName is the name of the finalizer
+ // triggering the deletion of the subscription
+ SubscriptionFinalizerName = MetadataNamespace + "/deleteSubscription"
)
diff --git a/tests/e2e/fixtures/declarative_pub_sub/destination-cluster.yaml.template b/tests/e2e/fixtures/declarative_pub_sub/destination-cluster.yaml.template
new file mode 100644
index 0000000000..1597981714
--- /dev/null
+++ b/tests/e2e/fixtures/declarative_pub_sub/destination-cluster.yaml.template
@@ -0,0 +1,48 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Cluster
+metadata:
+ name: destination-cluster
+spec:
+ instances: 1
+ externalClusters:
+ - name: source-cluster
+ connectionParameters:
+ host: source-cluster-rw
+ user: app
+ dbname: declarative
+ port: "5432"
+ password:
+ name: source-cluster-app
+ key: password
+
+ postgresql:
+ parameters:
+ max_connections: "110"
+ log_checkpoints: "on"
+ log_lock_waits: "on"
+ log_min_duration_statement: '1000'
+ log_statement: 'ddl'
+ log_temp_files: '1024'
+ log_autovacuum_min_duration: '1s'
+ log_replication_commands: 'on'
+
+ # Example of rolling update strategy:
+ # - unsupervised: automated update of the primary once all
+ # replicas have been upgraded (default)
+ # - supervised: requires manual supervision to perform
+ # the switchover of the primary
+ primaryUpdateStrategy: unsupervised
+ primaryUpdateMethod: switchover
+
+ bootstrap:
+ initdb:
+ database: app
+ owner: app
+
+ # Persistent storage configuration
+ storage:
+ storageClass: ${E2E_DEFAULT_STORAGE_CLASS}
+ size: 1Gi
+ walStorage:
+ storageClass: ${E2E_DEFAULT_STORAGE_CLASS}
+ size: 1Gi
diff --git a/tests/e2e/fixtures/declarative_pub_sub/destination-database.yaml b/tests/e2e/fixtures/declarative_pub_sub/destination-database.yaml
new file mode 100644
index 0000000000..2a6e122647
--- /dev/null
+++ b/tests/e2e/fixtures/declarative_pub_sub/destination-database.yaml
@@ -0,0 +1,9 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Database
+metadata:
+ name: destination-db-declarative
+spec:
+ name: declarative
+ owner: app
+ cluster:
+ name: destination-cluster
diff --git a/tests/e2e/fixtures/declarative_pub_sub/pub.yaml b/tests/e2e/fixtures/declarative_pub_sub/pub.yaml
new file mode 100644
index 0000000000..bd09d64014
--- /dev/null
+++ b/tests/e2e/fixtures/declarative_pub_sub/pub.yaml
@@ -0,0 +1,11 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Publication
+metadata:
+ name: publication-declarative
+spec:
+ name: pub
+ dbname: declarative
+ cluster:
+ name: source-cluster
+ target:
+ allTables: true
diff --git a/tests/e2e/fixtures/declarative_pub_sub/source-cluster.yaml.template b/tests/e2e/fixtures/declarative_pub_sub/source-cluster.yaml.template
new file mode 100644
index 0000000000..398a6613c8
--- /dev/null
+++ b/tests/e2e/fixtures/declarative_pub_sub/source-cluster.yaml.template
@@ -0,0 +1,48 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Cluster
+metadata:
+ name: source-cluster
+spec:
+ instances: 1
+
+ postgresql:
+ parameters:
+ max_connections: "110"
+ log_checkpoints: "on"
+ log_lock_waits: "on"
+ log_min_duration_statement: '1000'
+ log_statement: 'ddl'
+ log_temp_files: '1024'
+ log_autovacuum_min_duration: '1s'
+ log_replication_commands: 'on'
+ pg_hba:
+ - hostssl replication app all scram-sha-256
+
+ managed:
+ roles:
+ - name: app
+ ensure: present
+ login: true
+ replication: true
+
+
+ # Example of rolling update strategy:
+ # - unsupervised: automated update of the primary once all
+ # replicas have been upgraded (default)
+ # - supervised: requires manual supervision to perform
+ # the switchover of the primary
+ primaryUpdateStrategy: unsupervised
+ primaryUpdateMethod: switchover
+
+ bootstrap:
+ initdb:
+ database: app
+ owner: app
+
+ # Persistent storage configuration
+ storage:
+ storageClass: ${E2E_DEFAULT_STORAGE_CLASS}
+ size: 1Gi
+ walStorage:
+ storageClass: ${E2E_DEFAULT_STORAGE_CLASS}
+ size: 1Gi
diff --git a/tests/e2e/fixtures/declarative_pub_sub/source-database.yaml b/tests/e2e/fixtures/declarative_pub_sub/source-database.yaml
new file mode 100644
index 0000000000..80d5a4cf27
--- /dev/null
+++ b/tests/e2e/fixtures/declarative_pub_sub/source-database.yaml
@@ -0,0 +1,9 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Database
+metadata:
+ name: source-db-declarative
+spec:
+ name: declarative
+ owner: app
+ cluster:
+ name: source-cluster
diff --git a/tests/e2e/fixtures/declarative_pub_sub/sub.yaml b/tests/e2e/fixtures/declarative_pub_sub/sub.yaml
new file mode 100644
index 0000000000..8eb5aabdc4
--- /dev/null
+++ b/tests/e2e/fixtures/declarative_pub_sub/sub.yaml
@@ -0,0 +1,11 @@
+apiVersion: postgresql.cnpg.io/v1
+kind: Subscription
+metadata:
+ name: subscription-declarative
+spec:
+ name: sub
+ dbname: declarative
+ publicationName: pub
+ cluster:
+ name: destination-cluster
+ externalClusterName: source-cluster
diff --git a/tests/e2e/publication_subscription_test.go b/tests/e2e/publication_subscription_test.go
new file mode 100644
index 0000000000..2df531cacb
--- /dev/null
+++ b/tests/e2e/publication_subscription_test.go
@@ -0,0 +1,264 @@
+/*
+Copyright The CloudNativePG Contributors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package e2e
+
+import (
+ "fmt"
+ "time"
+
+ apierrs "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+
+ apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
+ "github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
+ "github.com/cloudnative-pg/cloudnative-pg/tests"
+ testUtils "github.com/cloudnative-pg/cloudnative-pg/tests/utils"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+// - spinning up a cluster, apply a declarative publication/subscription on it
+
+// Set of tests in which we use the declarative publication and subscription CRDs on an existing cluster
+var _ = Describe("Publication and Subscription", Label(tests.LabelDeclarativePubSub), func() {
+ const (
+ sourceClusterManifest = fixturesDir + "/declarative_pub_sub/source-cluster.yaml.template"
+ destinationClusterManifest = fixturesDir + "/declarative_pub_sub/destination-cluster.yaml.template"
+ sourceDatabaseManifest = fixturesDir + "/declarative_pub_sub/source-database.yaml"
+ destinationDatabaseManifest = fixturesDir + "/declarative_pub_sub/destination-database.yaml"
+ pubManifest = fixturesDir + "/declarative_pub_sub/pub.yaml"
+ subManifest = fixturesDir + "/declarative_pub_sub/sub.yaml"
+ level = tests.Medium
+ )
+
+ BeforeEach(func() {
+ if testLevelEnv.Depth < int(level) {
+ Skip("Test depth is lower than the amount requested for this test")
+ }
+ })
+
+ Context("in a plain vanilla cluster", Ordered, func() {
+ const (
+ namespacePrefix = "declarative-pub-sub"
+ dbname = "declarative"
+ tableName = "test"
+ )
+ var (
+ sourceClusterName, destinationClusterName, namespace string
+ databaseObjectName, pubObjectName, subObjectName string
+ pub *apiv1.Publication
+ sub *apiv1.Subscription
+ err error
+ )
+
+ BeforeAll(func() {
+ // Create a cluster in a namespace we'll delete after the test
+ namespace, err = env.CreateUniqueTestNamespace(namespacePrefix)
+ Expect(err).ToNot(HaveOccurred())
+
+ sourceClusterName, err = env.GetResourceNameFromYAML(sourceClusterManifest)
+ Expect(err).ToNot(HaveOccurred())
+
+ destinationClusterName, err = env.GetResourceNameFromYAML(destinationClusterManifest)
+ Expect(err).ToNot(HaveOccurred())
+
+ By("setting up source cluster", func() {
+ AssertCreateCluster(namespace, sourceClusterName, sourceClusterManifest, env)
+ })
+
+ By("setting up destination cluster", func() {
+ AssertCreateCluster(namespace, destinationClusterName, destinationClusterManifest, env)
+ })
+ })
+
+ assertCreateDatabase := func(namespace, clusterName, databaseManifest, databaseName string) {
+ databaseObjectName, err = env.GetResourceNameFromYAML(databaseManifest)
+ Expect(err).NotTo(HaveOccurred())
+
+ By(fmt.Sprintf("applying the %s Database CRD manifest", databaseObjectName), func() {
+ CreateResourceFromFile(namespace, databaseManifest)
+ })
+
+ By(fmt.Sprintf("ensuring the %s Database CRD succeeded reconciliation", databaseObjectName), func() {
+ databaseObject := &apiv1.Database{}
+ databaseNamespacedName := types.NamespacedName{
+ Namespace: namespace,
+ Name: databaseObjectName,
+ }
+
+ Eventually(func(g Gomega) {
+ err := env.Client.Get(env.Ctx, databaseNamespacedName, databaseObject)
+ Expect(err).ToNot(HaveOccurred())
+ g.Expect(databaseObject.Status.Ready).Should(BeTrue())
+ }, 300).WithPolling(10 * time.Second).Should(Succeed())
+ })
+
+ By(fmt.Sprintf("verifying the %s database has been created", databaseName), func() {
+ primaryPodInfo, err := env.GetClusterPrimary(namespace, clusterName)
+ Expect(err).ToNot(HaveOccurred())
+
+ AssertDatabaseExists(primaryPodInfo, databaseName, true)
+ })
+ }
+
+ assertPublicationExists := func(namespace, primaryPod string, pub *apiv1.Publication) {
+ query := fmt.Sprintf("select count(*) from pg_publication where pubname = '%s'",
+ pub.Spec.Name)
+ Eventually(func(g Gomega) {
+ stdout, _, err := env.ExecQueryInInstancePod(
+ testUtils.PodLocator{
+ Namespace: namespace,
+ PodName: primaryPod,
+ },
+ dbname,
+ query)
+ g.Expect(err).ToNot(HaveOccurred())
+ g.Expect(stdout).Should(ContainSubstring("1"), "expected publication not found")
+ }, 30).Should(Succeed())
+ }
+
+ assertSubscriptionExists := func(namespace, primaryPod string, sub *apiv1.Subscription) {
+ query := fmt.Sprintf("select count(*) from pg_subscription where subname = '%s'",
+ sub.Spec.Name)
+ Eventually(func(g Gomega) {
+ stdout, _, err := env.ExecQueryInInstancePod(
+ testUtils.PodLocator{
+ Namespace: namespace,
+ PodName: primaryPod,
+ },
+ dbname,
+ query)
+ g.Expect(err).ToNot(HaveOccurred())
+ g.Expect(stdout).Should(ContainSubstring("1"), "expected subscription not found")
+ }, 30).Should(Succeed())
+ }
+
+ It("can perform logical replication", func() {
+ assertCreateDatabase(namespace, sourceClusterName, sourceDatabaseManifest, dbname)
+
+ tableLocator := TableLocator{
+ Namespace: namespace,
+ ClusterName: sourceClusterName,
+ DatabaseName: dbname,
+ TableName: tableName,
+ }
+ AssertCreateTestData(env, tableLocator)
+
+ assertCreateDatabase(namespace, destinationClusterName, destinationDatabaseManifest, dbname)
+
+ By("creating an empty table inside the destination database", func() {
+ query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %v (column1 int) ;", tableName)
+ _, err = testUtils.RunExecOverForward(env, namespace, destinationClusterName, dbname,
+ apiv1.ApplicationUserSecretSuffix, query)
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ By("applying Publication CRD manifest", func() {
+ CreateResourceFromFile(namespace, pubManifest)
+ pubObjectName, err = env.GetResourceNameFromYAML(pubManifest)
+ Expect(err).NotTo(HaveOccurred())
+ })
+
+ By("ensuring the Publication CRD succeeded reconciliation", func() {
+ // get publication object
+ pub = &apiv1.Publication{}
+ pubNamespacedName := types.NamespacedName{
+ Namespace: namespace,
+ Name: pubObjectName,
+ }
+
+ Eventually(func(g Gomega) {
+ err := env.Client.Get(env.Ctx, pubNamespacedName, pub)
+ Expect(err).ToNot(HaveOccurred())
+ g.Expect(pub.Status.Ready).Should(BeTrue())
+ }, 300).WithPolling(10 * time.Second).Should(Succeed())
+ })
+
+ By("verifying new publication has been created", func() {
+ primaryPodInfo, err := env.GetClusterPrimary(namespace, sourceClusterName)
+ Expect(err).ToNot(HaveOccurred())
+
+ assertPublicationExists(namespace, primaryPodInfo.Name, pub)
+ })
+
+ By("applying Subscription CRD manifest", func() {
+ CreateResourceFromFile(namespace, subManifest)
+ subObjectName, err = env.GetResourceNameFromYAML(subManifest)
+ Expect(err).NotTo(HaveOccurred())
+ })
+
+ By("ensuring the Subscription CRD succeeded reconciliation", func() {
+ // get subscription object
+ sub = &apiv1.Subscription{}
+ pubNamespacedName := types.NamespacedName{
+ Namespace: namespace,
+ Name: subObjectName,
+ }
+
+ Eventually(func(g Gomega) {
+ err := env.Client.Get(env.Ctx, pubNamespacedName, sub)
+ Expect(err).ToNot(HaveOccurred())
+ g.Expect(sub.Status.Ready).Should(BeTrue())
+ }, 300).WithPolling(10 * time.Second).Should(Succeed())
+ })
+
+ By("verifying new subscription has been created", func() {
+ primaryPodInfo, err := env.GetClusterPrimary(namespace, destinationClusterName)
+ Expect(err).ToNot(HaveOccurred())
+
+ assertSubscriptionExists(namespace, primaryPodInfo.Name, sub)
+ })
+
+ By("checking that the data is present inside the destination cluster database", func() {
+ tableLocator := TableLocator{
+ Namespace: namespace,
+ ClusterName: destinationClusterName,
+ DatabaseName: dbname,
+ TableName: tableName,
+ }
+ AssertDataExpectedCount(env, tableLocator, 2)
+ })
+
+ // TODO: remove once finalizers cleanup is handled by the operator
+ deleteObjectWithFinalizer := func(object client.Object, finalizerName string) error {
+ if err := testUtils.DeleteObject(env, object); err != nil {
+ return err
+ }
+
+ updatedObj := object.DeepCopyObject().(client.Object)
+ controllerutil.RemoveFinalizer(updatedObj, finalizerName)
+ if err := env.Client.Patch(env.Ctx, updatedObj, client.MergeFrom(object)); err != nil {
+ if apierrs.IsNotFound(err) {
+ return nil
+ }
+ return err
+ }
+
+ return nil
+ }
+
+ err = deleteObjectWithFinalizer(pub, utils.PublicationFinalizerName)
+ Expect(err).ToNot(HaveOccurred())
+
+ err = deleteObjectWithFinalizer(sub, utils.SubscriptionFinalizerName)
+ Expect(err).ToNot(HaveOccurred())
+ })
+ })
+})
diff --git a/tests/labels.go b/tests/labels.go
index 25b2b858b5..98649f2be2 100644
--- a/tests/labels.go
+++ b/tests/labels.go
@@ -32,6 +32,9 @@ const (
// LabelDeclarativeDatabases is a label for selecting the declarative databases test
LabelDeclarativeDatabases = "declarative-databases"
+ // LabelDeclarativePubSub is a label for selecting the publication / subscription test
+ LabelDeclarativePubSub = "publication-subscription"
+
// LabelDisruptive is the string for labelling disruptive tests
LabelDisruptive = "disruptive"