diff --git a/internal/management/controller/publication_controller_sql.go b/internal/management/controller/publication_controller_sql.go index 0597b5cfb7..4006c81463 100644 --- a/internal/management/controller/publication_controller_sql.go +++ b/internal/management/controller/publication_controller_sql.go @@ -125,13 +125,16 @@ func toPublicationCreateSQL(obj *apiv1.Publication) []string { func toPublicationAlterSQL(obj *apiv1.Publication) []string { result := make([]string, 0, 3) - result = append(result, - fmt.Sprintf( - "ALTER PUBLICATION %s SET %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - toPublicationTargetSQL(&obj.Spec.Target), - ), - ) + + if len(obj.Spec.Target.Objects) > 0 { + result = append(result, + fmt.Sprintf( + "ALTER PUBLICATION %s SET %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + toPublicationTargetObjectsSQL(&obj.Spec.Target), + ), + ) + } if len(obj.Spec.Owner) > 0 { result = append(result, @@ -177,6 +180,10 @@ func toPublicationTargetSQL(obj *apiv1.PublicationTarget) string { return "FOR ALL TABLES" } + return toPublicationTargetObjectsSQL(obj) +} + +func toPublicationTargetObjectsSQL(obj *apiv1.PublicationTarget) string { result := "" for _, object := range obj.Objects { if len(result) > 0 { diff --git a/internal/management/controller/subscription_controller_sql.go b/internal/management/controller/subscription_controller_sql.go index 775c887480..05925a8176 100644 --- a/internal/management/controller/subscription_controller_sql.go +++ b/internal/management/controller/subscription_controller_sql.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "bytes" "context" "database/sql" "fmt" @@ -110,7 +111,7 @@ func toSubscriptionCreateSQL(obj *apiv1.Subscription, connString string) []strin pgx.Identifier{obj.Spec.PublicationName}.Sanitize(), ) if len(obj.Spec.Parameters) > 0 { - createQuery = fmt.Sprintf("%s WITH (%s)", createQuery, obj.Spec.Parameters) + createQuery = fmt.Sprintf("%s WITH (%s)", createQuery, toPostgresParameters(obj.Spec.Parameters)) } result = append(result, createQuery) @@ -127,6 +128,16 @@ func toSubscriptionCreateSQL(obj *apiv1.Subscription, connString string) []strin return result } +func toPostgresParameters(parameters map[string]string) string { + b := new(bytes.Buffer) + for key, value := range parameters { + _, _ = fmt.Fprintf(b, "%s = '%s', ", key, value) + } + + // pruning last 2 chars `, ` + return b.String()[:len(b.String())-2] +} + func toSubscriptionAlterSQL(obj *apiv1.Subscription, connString string) []string { result := make([]string, 0, 4) @@ -158,7 +169,7 @@ func toSubscriptionAlterSQL(obj *apiv1.Subscription, connString string) []string fmt.Sprintf( "ALTER SUBSCRIPTION %s SET (%s)", result, - obj.Spec.Parameters, + toPostgresParameters(obj.Spec.Parameters), ), ) } diff --git a/internal/management/controller/subscription_controller_test.go b/internal/management/controller/subscription_controller_test.go new file mode 100644 index 0000000000..0c7a069a80 --- /dev/null +++ b/internal/management/controller/subscription_controller_test.go @@ -0,0 +1,32 @@ +/* +Copyright The CloudNativePG Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Conversion of PG parameters from map to string of key/value pairs", func() { + It("returns expected well-formed list", func() { + m := map[string]string{ + "a": "1", "b": "2", + } + res := toPostgresParameters(m) + Expect(res).To(BeEquivalentTo(`a = '1', b = '2'`)) + }) +}) diff --git a/tests/e2e/declarative_pub_sub_test.go b/tests/e2e/declarative_pub_sub_test.go index 4756586321..e5d9e6f14e 100644 --- a/tests/e2e/declarative_pub_sub_test.go +++ b/tests/e2e/declarative_pub_sub_test.go @@ -36,12 +36,13 @@ import ( var _ = Describe("Declarative publication and subscription test", Label(tests.LabelSmoke, tests.LabelBasic, tests.LabelDeclarativePubSub), func() { const ( - sourceClusterManifest = fixturesDir + "/declarative_pub_sub/source-cluster.yaml.template" - destinationClusterManifest = fixturesDir + "/declarative_pub_sub/destination-cluster.yaml.template" - databaseManifest = fixturesDir + "/declarative_pub_sub/database.yaml.template" - pubManifest = fixturesDir + "/declarative_pub_sub/pub.yaml.template" - subManifest = fixturesDir + "/declarative_pub_sub/sub.yaml.template" - level = tests.Medium + sourceClusterManifest = fixturesDir + "/declarative_pub_sub/source-cluster.yaml.template" + destinationClusterManifest = fixturesDir + "/declarative_pub_sub/destination-cluster.yaml.template" + sourceDatabaseManifest = fixturesDir + "/declarative_pub_sub/source-database.yaml.template" + destinationDatabaseManifest = fixturesDir + "/declarative_pub_sub/destination-database.yaml.template" + pubManifest = fixturesDir + "/declarative_pub_sub/pub.yaml.template" + subManifest = fixturesDir + "/declarative_pub_sub/sub.yaml.template" + level = tests.Medium ) BeforeEach(func() { @@ -54,13 +55,11 @@ var _ = Describe("Declarative publication and subscription test", Label(tests.La const ( namespacePrefix = "declarative-pub-sub" dbname = "declarative" - pubName = "declarative-pub" - subName = "declarative-sub" ) var ( sourceClusterName, destinationClusterName, namespace string databaseObjectName, pubObjectName, subObjectName string - database *apiv1.Database + sourceDatabase, destinationDatabase *apiv1.Database pub *apiv1.Publication sub *apiv1.Subscription err error @@ -118,32 +117,57 @@ var _ = Describe("Declarative publication and subscription test", Label(tests.La }, 30).Should(Succeed()) } - It("can add a declarative database, publication and subscription", func() { //nolint:dupl - By("applying Database CRD manifest", func() { - CreateResourceFromFile(namespace, databaseManifest) - databaseObjectName, err = env.GetResourceNameFromYAML(databaseManifest) + It("can add declarative databases, publication and subscription", func() { //nolint:dupl + By("applying source Database CRD manifest", func() { + CreateResourceFromFile(namespace, sourceDatabaseManifest) + databaseObjectName, err = env.GetResourceNameFromYAML(sourceDatabaseManifest) Expect(err).NotTo(HaveOccurred()) }) - By("ensuring the Database CRD succeeded reconciliation", func() { - // get database object - database = &apiv1.Database{} + By("ensuring the source Database CRD succeeded reconciliation", func() { + // get source database object + sourceDatabase = &apiv1.Database{} databaseNamespacedName := types.NamespacedName{ Namespace: namespace, Name: databaseObjectName, } Eventually(func(g Gomega) { - err := env.Client.Get(env.Ctx, databaseNamespacedName, database) + err := env.Client.Get(env.Ctx, databaseNamespacedName, sourceDatabase) Expect(err).ToNot(HaveOccurred()) - g.Expect(database.Status.Ready).Should(BeTrue()) + g.Expect(sourceDatabase.Status.Ready).Should(BeTrue()) }, 300).WithPolling(10 * time.Second).Should(Succeed()) }) - By("verifying new database has been created", func() { + By("verifying source database has been created", func() { primaryPodInfo, err := env.GetClusterPrimary(namespace, sourceClusterName) Expect(err).ToNot(HaveOccurred()) AssertDatabaseExists(namespace, primaryPodInfo.Name, dbname, true) }) + By("applying destination Database CRD manifest", func() { + CreateResourceFromFile(namespace, destinationDatabaseManifest) + databaseObjectName, err = env.GetResourceNameFromYAML(destinationDatabaseManifest) + Expect(err).NotTo(HaveOccurred()) + }) + By("ensuring the destination Database CRD succeeded reconciliation", func() { + // get destination database object + destinationDatabase = &apiv1.Database{} + databaseNamespacedName := types.NamespacedName{ + Namespace: namespace, + Name: databaseObjectName, + } + + Eventually(func(g Gomega) { + err := env.Client.Get(env.Ctx, databaseNamespacedName, destinationDatabase) + Expect(err).ToNot(HaveOccurred()) + g.Expect(sourceDatabase.Status.Ready).Should(BeTrue()) + }, 300).WithPolling(10 * time.Second).Should(Succeed()) + }) + By("verifying destination database has been created", func() { + primaryPodInfo, err := env.GetClusterPrimary(namespace, destinationClusterName) + Expect(err).ToNot(HaveOccurred()) + + AssertDatabaseExists(namespace, primaryPodInfo.Name, dbname, true) + }) By("applying Publication CRD manifest", func() { CreateResourceFromFile(namespace, pubManifest) pubObjectName, err = env.GetResourceNameFromYAML(pubManifest) diff --git a/tests/e2e/fixtures/declarative_pub_sub/destination-cluster.yaml.template b/tests/e2e/fixtures/declarative_pub_sub/destination-cluster.yaml.template index c52698e6d2..d82c6387ff 100644 --- a/tests/e2e/fixtures/declarative_pub_sub/destination-cluster.yaml.template +++ b/tests/e2e/fixtures/declarative_pub_sub/destination-cluster.yaml.template @@ -11,6 +11,9 @@ spec: user: app dbname: declarative port: "5432" + password: + name: source-cluster-app + key: password postgresql: parameters: diff --git a/tests/e2e/fixtures/declarative_pub_sub/destination-database.yaml.template b/tests/e2e/fixtures/declarative_pub_sub/destination-database.yaml.template new file mode 100644 index 0000000000..2a6e122647 --- /dev/null +++ b/tests/e2e/fixtures/declarative_pub_sub/destination-database.yaml.template @@ -0,0 +1,9 @@ +apiVersion: postgresql.cnpg.io/v1 +kind: Database +metadata: + name: destination-db-declarative +spec: + name: declarative + owner: app + cluster: + name: destination-cluster diff --git a/tests/e2e/fixtures/declarative_pub_sub/pub.yaml.template b/tests/e2e/fixtures/declarative_pub_sub/pub.yaml.template index 1fa6bada73..5768a1b367 100644 --- a/tests/e2e/fixtures/declarative_pub_sub/pub.yaml.template +++ b/tests/e2e/fixtures/declarative_pub_sub/pub.yaml.template @@ -5,6 +5,7 @@ metadata: spec: name: pub dbname: declarative + owner: app cluster: name: source-cluster target: diff --git a/tests/e2e/fixtures/declarative_pub_sub/source-cluster.yaml.template b/tests/e2e/fixtures/declarative_pub_sub/source-cluster.yaml.template index 5b0c561dd3..303b0a7388 100644 --- a/tests/e2e/fixtures/declarative_pub_sub/source-cluster.yaml.template +++ b/tests/e2e/fixtures/declarative_pub_sub/source-cluster.yaml.template @@ -15,6 +15,16 @@ spec: log_temp_files: '1024' log_autovacuum_min_duration: '1s' log_replication_commands: 'on' + pg_hba: + - hostssl replication app all scram-sha-256 + + managed: + roles: + - name: app + ensure: present + login: true + replication: true + # Example of rolling update strategy: # - unsupervised: automated update of the primary once all diff --git a/tests/e2e/fixtures/declarative_pub_sub/database.yaml.template b/tests/e2e/fixtures/declarative_pub_sub/source-database.yaml.template similarity index 81% rename from tests/e2e/fixtures/declarative_pub_sub/database.yaml.template rename to tests/e2e/fixtures/declarative_pub_sub/source-database.yaml.template index 0792002b55..80d5a4cf27 100644 --- a/tests/e2e/fixtures/declarative_pub_sub/database.yaml.template +++ b/tests/e2e/fixtures/declarative_pub_sub/source-database.yaml.template @@ -1,7 +1,7 @@ apiVersion: postgresql.cnpg.io/v1 kind: Database metadata: - name: db-declarative + name: source-db-declarative spec: name: declarative owner: app diff --git a/tests/e2e/fixtures/declarative_pub_sub/sub.yaml.template b/tests/e2e/fixtures/declarative_pub_sub/sub.yaml.template index 146381a945..0988d7196a 100644 --- a/tests/e2e/fixtures/declarative_pub_sub/sub.yaml.template +++ b/tests/e2e/fixtures/declarative_pub_sub/sub.yaml.template @@ -5,8 +5,8 @@ metadata: spec: name: sub dbname: declarative - owner: app publicationName: pub + owner: app cluster: name: destination-cluster externalClusterName: source-cluster