diff --git a/internal/management/controller/publication_controller.go b/internal/management/controller/publication_controller.go index f27f5aaf87..bed200d923 100644 --- a/internal/management/controller/publication_controller.go +++ b/internal/management/controller/publication_controller.go @@ -123,22 +123,35 @@ func (r *PublicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{RequeueAfter: publicationReconciliationInterval}, markAsReady(ctx, r.Client, &publication) } +func (r *PublicationReconciler) evaluateDropPublication(ctx context.Context, pub *apiv1.Publication) error { + if pub.Spec.ReclaimPolicy != apiv1.PublicationReclaimDelete { + return nil + } + db, err := r.instance.ConnectionPool().Connection(pub.Spec.DBName) + if err != nil { + return fmt.Errorf("while getting DB connection: %w", err) + } + + return executeDropPublication(ctx, db, pub.Spec.Name) +} + // NewPublicationReconciler creates a new publication reconciler func NewPublicationReconciler( mgr manager.Manager, instance *postgres.Instance, ) *PublicationReconciler { - onFinalizerDelete := func(ctx context.Context, pub *apiv1.Publication) error { - if pub.Spec.ReclaimPolicy == apiv1.PublicationReclaimDelete { - return dropPublication(ctx, instance, pub) - } - return nil - } - return &PublicationReconciler{ - Client: mgr.GetClient(), - instance: instance, - finalizerReconciler: newFinalizerReconciler(mgr.GetClient(), utils.PublicationFinalizerName, onFinalizerDelete), + pr := &PublicationReconciler{ + Client: mgr.GetClient(), + instance: instance, } + + pr.finalizerReconciler = newFinalizerReconciler( + mgr.GetClient(), + utils.PublicationFinalizerName, + pr.evaluateDropPublication, + ) + + return pr } // SetupWithManager sets up the controller with the Manager. diff --git a/internal/management/controller/publication_controller_sql.go b/internal/management/controller/publication_controller_sql.go index e76dbeacfb..3a56f450b8 100644 --- a/internal/management/controller/publication_controller_sql.go +++ b/internal/management/controller/publication_controller_sql.go @@ -25,7 +25,6 @@ import ( "github.com/jackc/pgx/v5" apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" - "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres" ) func (r *PublicationReconciler) alignPublication(ctx context.Context, obj *apiv1.Publication) error { @@ -160,15 +159,10 @@ func toPublicationAlterSQL(obj *apiv1.Publication) []string { return result } -func dropPublication(ctx context.Context, instance *postgres.Instance, obj *apiv1.Publication) error { - db, err := instance.ConnectionPool().Connection(obj.Spec.DBName) - if err != nil { - return fmt.Errorf("while getting DB connection: %w", err) - } - +func executeDropPublication(ctx context.Context, db *sql.DB, name string) error { if _, err := db.ExecContext( ctx, - fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{obj.Spec.Name}.Sanitize()), + fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{name}.Sanitize()), ); err != nil { return fmt.Errorf("while dropping publication: %w", err) } diff --git a/internal/management/controller/publication_controller_sql_test.go b/internal/management/controller/publication_controller_sql_test.go new file mode 100644 index 0000000000..c242f533fa --- /dev/null +++ b/internal/management/controller/publication_controller_sql_test.go @@ -0,0 +1,73 @@ +/* +Copyright The CloudNativePG Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// nolint: dupl +package controller + +import ( + "database/sql" + "fmt" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/jackc/pgx/v5" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("publication sql", func() { + var ( + dbMock sqlmock.Sqlmock + db *sql.DB + ) + + BeforeEach(func() { + var err error + db, dbMock, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + Expect(dbMock.ExpectationsWereMet()).To(Succeed()) + }) + + It("drops the publication successfully", func(ctx SpecContext) { + dbMock.ExpectExec(fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{"publication_name"}.Sanitize())). + WillReturnResult(sqlmock.NewResult(1, 1)) + + err := executeDropPublication(ctx, db, "publication_name") + Expect(err).ToNot(HaveOccurred()) + }) + + It("returns an error when dropping the publication fails", func(ctx SpecContext) { + dbMock.ExpectExec(fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", + pgx.Identifier{"publication_name"}.Sanitize())). + WillReturnError(fmt.Errorf("drop publication error")) + + err := executeDropPublication(ctx, db, "publication_name") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("while dropping publication: drop publication error")) + }) + + It("sanitizes the publication name correctly", func(ctx SpecContext) { + dbMock.ExpectExec( + fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{"sanitized_name"}.Sanitize())). + WillReturnResult(sqlmock.NewResult(1, 1)) + + err := executeDropPublication(ctx, db, "sanitized_name") + Expect(err).ToNot(HaveOccurred()) + }) +}) diff --git a/internal/management/controller/subscription_controller.go b/internal/management/controller/subscription_controller.go index 15a307e7c7..4e8b6c0f1a 100644 --- a/internal/management/controller/subscription_controller.go +++ b/internal/management/controller/subscription_controller.go @@ -139,22 +139,31 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request ) } +func (r *SubscriptionReconciler) evaluateDropSubscription(ctx context.Context, sub *apiv1.Subscription) error { + if sub.Spec.ReclaimPolicy != apiv1.SubscriptionReclaimDelete { + return nil + } + + db, err := r.instance.ConnectionPool().Connection(sub.Spec.DBName) + if err != nil { + return fmt.Errorf("while getting DB connection: %w", err) + } + return executeDropSubscription(ctx, db, sub.Spec.Name) +} + // NewSubscriptionReconciler creates a new subscription reconciler func NewSubscriptionReconciler( mgr manager.Manager, instance *postgres.Instance, ) *SubscriptionReconciler { - onFinalizerDelete := func(ctx context.Context, sub *apiv1.Subscription) error { - if sub.Spec.ReclaimPolicy == apiv1.SubscriptionReclaimDelete { - return dropSubscription(ctx, instance, sub) - } - return nil - } - return &SubscriptionReconciler{ - Client: mgr.GetClient(), - instance: instance, - finalizerReconciler: newFinalizerReconciler(mgr.GetClient(), utils.SubscriptionFinalizerName, onFinalizerDelete), - } + sr := &SubscriptionReconciler{Client: mgr.GetClient(), instance: instance} + sr.finalizerReconciler = newFinalizerReconciler( + mgr.GetClient(), + utils.SubscriptionFinalizerName, + sr.evaluateDropSubscription, + ) + + return sr } // SetupWithManager sets up the controller with the Manager diff --git a/internal/management/controller/subscription_controller_sql.go b/internal/management/controller/subscription_controller_sql.go index 1cad22b0e1..ec38ecbb6c 100644 --- a/internal/management/controller/subscription_controller_sql.go +++ b/internal/management/controller/subscription_controller_sql.go @@ -26,7 +26,6 @@ import ( "github.com/lib/pq" apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" - "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres" ) func (r *SubscriptionReconciler) alignSubscription( @@ -178,15 +177,10 @@ func toSubscriptionAlterSQL(obj *apiv1.Subscription, connString string) []string return result } -func dropSubscription(ctx context.Context, instance *postgres.Instance, obj *apiv1.Subscription) error { - db, err := instance.ConnectionPool().Connection(obj.Spec.DBName) - if err != nil { - return fmt.Errorf("while getting DB connection: %w", err) - } - +func executeDropSubscription(ctx context.Context, db *sql.DB, name string) error { if _, err := db.ExecContext( ctx, - fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{obj.Spec.Name}.Sanitize()), + fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{name}.Sanitize()), ); err != nil { return fmt.Errorf("while dropping subscription: %w", err) } diff --git a/internal/management/controller/subscription_controller_sql_test.go b/internal/management/controller/subscription_controller_sql_test.go new file mode 100644 index 0000000000..192d3576d5 --- /dev/null +++ b/internal/management/controller/subscription_controller_sql_test.go @@ -0,0 +1,72 @@ +/* +Copyright The CloudNativePG Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// nolint: dupl +package controller + +import ( + "database/sql" + "fmt" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/jackc/pgx/v5" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// nolint: dupl +var _ = Describe("subscription sql", func() { + var ( + dbMock sqlmock.Sqlmock + db *sql.DB + ) + + BeforeEach(func() { + var err error + db, dbMock, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + Expect(dbMock.ExpectationsWereMet()).To(Succeed()) + }) + + It("drops the subscription successfully", func(ctx SpecContext) { + dbMock.ExpectExec(fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{"subscription_name"}.Sanitize())). + WillReturnResult(sqlmock.NewResult(1, 1)) + + err := executeDropSubscription(ctx, db, "subscription_name") + Expect(err).ToNot(HaveOccurred()) + }) + + It("returns an error when dropping the subscription fails", func(ctx SpecContext) { + dbMock.ExpectExec(fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{"subscription_name"}.Sanitize())). + WillReturnError(fmt.Errorf("drop subscription error")) + + err := executeDropSubscription(ctx, db, "subscription_name") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("while dropping subscription: drop subscription error")) + }) + + It("sanitizes the subscription name correctly", func(ctx SpecContext) { + dbMock.ExpectExec(fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{"sanitized_name"}.Sanitize())). + WillReturnResult(sqlmock.NewResult(1, 1)) + + err := executeDropSubscription(ctx, db, "sanitized_name") + Expect(err).ToNot(HaveOccurred()) + }) +})