From c0cfeb338f4b1178d55dac1b42cb2659b50f247c Mon Sep 17 00:00:00 2001 From: Gabriele Quaresima Date: Mon, 14 Oct 2024 11:17:11 +0200 Subject: [PATCH] chore: split SQL functions in separated files Signed-off-by: Gabriele Quaresima --- .../postgresql_v1_subscription.yaml | 2 +- .../controller/publication_controller.go | 182 +--------------- .../controller/publication_controller_sql.go | 200 ++++++++++++++++++ .../controller/subscription_controller.go | 166 +-------------- .../controller/subscription_controller_sql.go | 183 ++++++++++++++++ tests/e2e/declarative_pub_sub_test.go | 1 + 6 files changed, 391 insertions(+), 343 deletions(-) create mode 100644 internal/management/controller/publication_controller_sql.go create mode 100644 internal/management/controller/subscription_controller_sql.go create mode 100644 tests/e2e/declarative_pub_sub_test.go diff --git a/config/olm-samples/postgresql_v1_subscription.yaml b/config/olm-samples/postgresql_v1_subscription.yaml index 7b54d422ba..126a28ff5f 100644 --- a/config/olm-samples/postgresql_v1_subscription.yaml +++ b/config/olm-samples/postgresql_v1_subscription.yaml @@ -8,4 +8,4 @@ spec: publicationName: pub cluster: name: cluster-example-dest - externalClusterName: cluster-example \ No newline at end of file + externalClusterName: cluster-example diff --git a/internal/management/controller/publication_controller.go b/internal/management/controller/publication_controller.go index 2cc4734993..2dbbf3fc5c 100644 --- a/internal/management/controller/publication_controller.go +++ b/internal/management/controller/publication_controller.go @@ -18,14 +18,11 @@ package controller import ( "context" - "database/sql" "errors" "fmt" - "strings" "time" "github.com/cloudnative-pg/machinery/pkg/log" - "github.com/jackc/pgx/v5" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -165,7 +162,7 @@ func (r *PublicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) ) } - return r.succeededRenconciliation( + return r.succeededReconciliation( ctx, &publication, ) @@ -201,8 +198,7 @@ func (r *PublicationReconciler) failedReconciliation( var statusError *instance.StatusError if errors.As(err, &statusError) { - // The body line of the instance manager contain the human - // readable error + // The body line of the instance manager contain the human-readable error publication.Status.Error = statusError.Body } @@ -216,7 +212,7 @@ func (r *PublicationReconciler) failedReconciliation( } // succeededReconciliation marks the reconciliation as succeeded -func (r *PublicationReconciler) succeededRenconciliation( +func (r *PublicationReconciler) succeededReconciliation( ctx context.Context, publication *apiv1.Publication, ) (ctrl.Result, error) { @@ -249,175 +245,3 @@ func (r *PublicationReconciler) GetCluster(ctx context.Context) (*apiv1.Cluster, return &cluster, nil } - -func (r *PublicationReconciler) alignPublication(ctx context.Context, obj *apiv1.Publication) error { - db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName) - if err != nil { - return fmt.Errorf("while getting DB connection: %w", err) - } - - row := db.QueryRowContext( - ctx, - ` - SELECT count(*) - FROM pg_publication - WHERE pubname = $1 - `, - obj.Spec.Name) - if row.Err() != nil { - return fmt.Errorf("while getting publication status: %w", row.Err()) - } - - var count int - if err := row.Scan(&count); err != nil { - return fmt.Errorf("while getting publication status (scan): %w", err) - } - - if count > 0 { - if err := r.patchPublication(ctx, db, obj); err != nil { - return fmt.Errorf("while patching publication: %w", err) - } - return nil - } - - if err := r.createPublication(ctx, db, obj); err != nil { - return fmt.Errorf("while creating publication: %w", err) - } - - return nil -} - -func (r *PublicationReconciler) patchPublication( - ctx context.Context, - db *sql.DB, - obj *apiv1.Publication, -) error { - sqls := toPublicationAlterSQL(obj) - for _, sqlQuery := range sqls { - if _, err := db.ExecContext(ctx, sqlQuery); err != nil { - return err - } - } - - return nil -} - -func (r *PublicationReconciler) createPublication( - ctx context.Context, - db *sql.DB, - obj *apiv1.Publication, -) error { - sqls := toPublicationCreateSQL(obj) - for _, sqlQuery := range sqls { - if _, err := db.ExecContext(ctx, sqlQuery); err != nil { - return err - } - } - return nil -} - -func toPublicationCreateSQL(obj *apiv1.Publication) []string { - result := make([]string, 0, 2) - - result = append(result, - fmt.Sprintf( - "CREATE PUBLICATION %s %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - toPublicationTargetSQL(&obj.Spec.Target), - ), - ) - - if len(obj.Spec.Owner) > 0 { - result = append(result, - fmt.Sprintf( - "ALTER PUBLICATION %s OWNER to %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - toPublicationTargetSQL(&obj.Spec.Target), - ), - ) - } - - if len(obj.Spec.Parameters) > 0 { - result = append(result, - fmt.Sprintf("%s WITH (%s)", result, obj.Spec.Parameters), - ) - } - - return result -} - -func toPublicationAlterSQL(obj *apiv1.Publication) []string { - result := make([]string, 0, 3) - result = append(result, - fmt.Sprintf( - "ALTER PUBLICATION %s SET %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - toPublicationTargetSQL(&obj.Spec.Target), - ), - ) - - if len(obj.Spec.Owner) > 0 { - result = append(result, - fmt.Sprintf( - "ALTER PUBLICATION %s OWNER TO %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - pgx.Identifier{obj.Spec.Owner}.Sanitize(), - ), - ) - } - - if len(obj.Spec.Parameters) > 0 { - result = append(result, - fmt.Sprintf( - "ALTER PUBLICATION %s SET (%s)", - result, - obj.Spec.Parameters, - ), - ) - } - - return result -} - -func (r *PublicationReconciler) dropPublication(ctx context.Context, obj *apiv1.Publication) error { - db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName) - if err != nil { - return fmt.Errorf("while getting DB connection: %w", err) - } - - if _, err := db.ExecContext( - ctx, - fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{obj.Spec.Name}.Sanitize()), - ); err != nil { - return fmt.Errorf("while dropping publication: %w", err) - } - - return nil -} - -func toPublicationTargetSQL(obj *apiv1.PublicationTarget) string { - if obj.AllTables != nil { - return "FOR ALL TABLES" - } - - result := "" - for _, object := range obj.Objects { - if len(result) > 0 { - result += ", " - } - result += toPublicationObjectSQL(&object) - } - - if len(result) > 0 { - result = fmt.Sprintf("FOR %s", result) - } - return result -} - -func toPublicationObjectSQL(obj *apiv1.PublicationTargetObject) string { - if len(obj.Schema) > 0 { - return fmt.Sprintf("TABLES IN SCHEMA %s", pgx.Identifier{obj.Schema}.Sanitize()) - } - - return fmt.Sprintf("TABLE %s", strings.Join(obj.TableExpression, ", ")) -} diff --git a/internal/management/controller/publication_controller_sql.go b/internal/management/controller/publication_controller_sql.go new file mode 100644 index 0000000000..da65cde219 --- /dev/null +++ b/internal/management/controller/publication_controller_sql.go @@ -0,0 +1,200 @@ +/* +Copyright The CloudNativePG Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/jackc/pgx/v5" + + apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" +) + +func (r *PublicationReconciler) alignPublication(ctx context.Context, obj *apiv1.Publication) error { + db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName) + if err != nil { + return fmt.Errorf("while getting DB connection: %w", err) + } + + row := db.QueryRowContext( + ctx, + ` + SELECT count(*) + FROM pg_publication + WHERE pubname = $1 + `, + obj.Spec.Name) + if row.Err() != nil { + return fmt.Errorf("while getting publication status: %w", row.Err()) + } + + var count int + if err := row.Scan(&count); err != nil { + return fmt.Errorf("while getting publication status (scan): %w", err) + } + + if count > 0 { + if err := r.patchPublication(ctx, db, obj); err != nil { + return fmt.Errorf("while patching publication: %w", err) + } + return nil + } + + if err := r.createPublication(ctx, db, obj); err != nil { + return fmt.Errorf("while creating publication: %w", err) + } + + return nil +} + +func (r *PublicationReconciler) patchPublication( + ctx context.Context, + db *sql.DB, + obj *apiv1.Publication, +) error { + sqls := toPublicationAlterSQL(obj) + for _, sqlQuery := range sqls { + if _, err := db.ExecContext(ctx, sqlQuery); err != nil { + return err + } + } + + return nil +} + +func (r *PublicationReconciler) createPublication( + ctx context.Context, + db *sql.DB, + obj *apiv1.Publication, +) error { + sqls := toPublicationCreateSQL(obj) + for _, sqlQuery := range sqls { + if _, err := db.ExecContext(ctx, sqlQuery); err != nil { + return err + } + } + return nil +} + +func toPublicationCreateSQL(obj *apiv1.Publication) []string { + result := make([]string, 0, 2) + + result = append(result, + fmt.Sprintf( + "CREATE PUBLICATION %s %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + toPublicationTargetSQL(&obj.Spec.Target), + ), + ) + + if len(obj.Spec.Owner) > 0 { + result = append(result, + fmt.Sprintf( + "ALTER PUBLICATION %s OWNER to %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + toPublicationTargetSQL(&obj.Spec.Target), + ), + ) + } + + if len(obj.Spec.Parameters) > 0 { + result = append(result, + fmt.Sprintf("%s WITH (%s)", result, obj.Spec.Parameters), + ) + } + + return result +} + +func toPublicationAlterSQL(obj *apiv1.Publication) []string { + result := make([]string, 0, 3) + result = append(result, + fmt.Sprintf( + "ALTER PUBLICATION %s SET %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + toPublicationTargetSQL(&obj.Spec.Target), + ), + ) + + if len(obj.Spec.Owner) > 0 { + result = append(result, + fmt.Sprintf( + "ALTER PUBLICATION %s OWNER TO %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + pgx.Identifier{obj.Spec.Owner}.Sanitize(), + ), + ) + } + + if len(obj.Spec.Parameters) > 0 { + result = append(result, + fmt.Sprintf( + "ALTER PUBLICATION %s SET (%s)", + result, + obj.Spec.Parameters, + ), + ) + } + + return result +} + +func (r *PublicationReconciler) dropPublication(ctx context.Context, obj *apiv1.Publication) error { + db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName) + if err != nil { + return fmt.Errorf("while getting DB connection: %w", err) + } + + if _, err := db.ExecContext( + ctx, + fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{obj.Spec.Name}.Sanitize()), + ); err != nil { + return fmt.Errorf("while dropping publication: %w", err) + } + + return nil +} + +func toPublicationTargetSQL(obj *apiv1.PublicationTarget) string { + if obj.AllTables != nil { + return "FOR ALL TABLES" + } + + result := "" + for _, object := range obj.Objects { + if len(result) > 0 { + result += ", " + } + result += toPublicationObjectSQL(&object) + } + + if len(result) > 0 { + result = fmt.Sprintf("FOR %s", result) + } + return result +} + +func toPublicationObjectSQL(obj *apiv1.PublicationTargetObject) string { + if len(obj.Schema) > 0 { + return fmt.Sprintf("TABLES IN SCHEMA %s", pgx.Identifier{obj.Schema}.Sanitize()) + } + + return fmt.Sprintf("TABLE %s", strings.Join(obj.TableExpression, ", ")) +} diff --git a/internal/management/controller/subscription_controller.go b/internal/management/controller/subscription_controller.go index e5fd29254b..3a60448c97 100644 --- a/internal/management/controller/subscription_controller.go +++ b/internal/management/controller/subscription_controller.go @@ -18,14 +18,11 @@ package controller import ( "context" - "database/sql" "errors" "fmt" "time" "github.com/cloudnative-pg/machinery/pkg/log" - "github.com/jackc/pgx/v5" - "github.com/lib/pq" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -190,7 +187,7 @@ func NewSubscriptionReconciler( } } -// SetupWithManager sets up the controller with the Manager. +// SetupWithManager sets up the controller with the Manager func (r *SubscriptionReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&apiv1.Subscription{}). @@ -209,8 +206,7 @@ func (r *SubscriptionReconciler) failedReconciliation( var statusError *instance.StatusError if errors.As(err, &statusError) { - // The body line of the instance manager contain the human - // readable error + // The body line of the instance manager contain the human-readable error subscription.Status.Error = statusError.Body } @@ -258,164 +254,8 @@ func (r *SubscriptionReconciler) GetCluster(ctx context.Context) (*apiv1.Cluster return &cluster, nil } -func (r *SubscriptionReconciler) alignSubscription( - ctx context.Context, - obj *apiv1.Subscription, - connString string, -) error { - db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName) - if err != nil { - return fmt.Errorf("while getting DB connection: %w", err) - } - - row := db.QueryRowContext( - ctx, - ` - SELECT count(*) - FROM pg_subscription - WHERE subname = $1 - `, - obj.Spec.Name) - if row.Err() != nil { - return fmt.Errorf("while getting subscription status: %w", row.Err()) - } - - var count int - if err := row.Scan(&count); err != nil { - return fmt.Errorf("while getting subscription status (scan): %w", err) - } - - if count > 0 { - if err := r.patchSubscription(ctx, db, obj, connString); err != nil { - return fmt.Errorf("while patching subscription: %w", err) - } - return nil - } - - if err := r.createSubscription(ctx, db, obj, connString); err != nil { - return fmt.Errorf("while creating subscription: %w", err) - } - - return nil -} - -func (r *SubscriptionReconciler) patchSubscription( - ctx context.Context, - db *sql.DB, - obj *apiv1.Subscription, - connString string, -) error { - sqls := toSubscriptionAlterSQL(obj, connString) - for _, sqlQuery := range sqls { - if _, err := db.ExecContext(ctx, sqlQuery); err != nil { - return err - } - } - - return nil -} - -func (r *SubscriptionReconciler) createSubscription( - ctx context.Context, - db *sql.DB, - obj *apiv1.Subscription, - connString string, -) error { - sqls := toSubscriptionCreateSQL(obj, connString) - for _, sqlQuery := range sqls { - if _, err := db.ExecContext(ctx, sqlQuery); err != nil { - return err - } - } - - return nil -} - -func toSubscriptionCreateSQL(obj *apiv1.Subscription, connString string) []string { - result := make([]string, 0, 2) - - createQuery := fmt.Sprintf( - "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - pq.QuoteLiteral(connString), - pgx.Identifier{obj.Spec.PublicationName}.Sanitize(), - ) - if len(obj.Spec.Parameters) > 0 { - createQuery = fmt.Sprintf("%s WITH (%s)", createQuery, obj.Spec.Parameters) - } - result = append(result, createQuery) - - if len(obj.Spec.Owner) > 0 { - result = append(result, - fmt.Sprintf( - "ALTER SUBSCRIPTION %s OWNER TO %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - pgx.Identifier{obj.Spec.Owner}.Sanitize(), - ), - ) - } - - return result -} - -func toSubscriptionAlterSQL(obj *apiv1.Subscription, connString string) []string { - result := make([]string, 0, 4) - - setPublicationSQL := fmt.Sprintf( - "ALTER SUBSCRIPTION %s SET PUBLICATION %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - pgx.Identifier{obj.Spec.PublicationName}.Sanitize(), - ) - - setConnStringSQL := fmt.Sprintf( - "ALTER SUBSCRIPTION %s CONNECTION %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - pq.QuoteLiteral(connString), - ) - result = append(result, setPublicationSQL, setConnStringSQL) - - if len(obj.Spec.Owner) > 0 { - result = append(result, - fmt.Sprintf( - "ALTER SUBSCRIPTION %s OWNER TO %s", - pgx.Identifier{obj.Spec.Name}.Sanitize(), - pgx.Identifier{obj.Spec.Owner}.Sanitize(), - ), - ) - } - - if len(obj.Spec.Parameters) > 0 { - result = append(result, - fmt.Sprintf( - "ALTER SUBSCRIPTION %s SET (%s)", - result, - obj.Spec.Parameters, - ), - ) - } - - return result -} - -func (r *SubscriptionReconciler) dropSubscription(ctx context.Context, obj *apiv1.Subscription) error { - db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName) - if err != nil { - return fmt.Errorf("while getting DB connection: %w", err) - } - - if _, err := db.ExecContext( - ctx, - fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{obj.Spec.Name}.Sanitize()), - ); err != nil { - return fmt.Errorf("while dropping subscription: %w", err) - } - - return nil -} - // getSubscriptionConnectionString gets the connection string to be used to connect to -// the specified external cluster, while connected to a pod of the specified -// cluster. +// the specified external cluster, while connected to a pod of the specified cluster func getSubscriptionConnectionString( cluster *apiv1.Cluster, externalClusterName string, diff --git a/internal/management/controller/subscription_controller_sql.go b/internal/management/controller/subscription_controller_sql.go new file mode 100644 index 0000000000..775c887480 --- /dev/null +++ b/internal/management/controller/subscription_controller_sql.go @@ -0,0 +1,183 @@ +/* +Copyright The CloudNativePG Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "database/sql" + "fmt" + + "github.com/jackc/pgx/v5" + "github.com/lib/pq" + + apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" +) + +func (r *SubscriptionReconciler) alignSubscription( + ctx context.Context, + obj *apiv1.Subscription, + connString string, +) error { + db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName) + if err != nil { + return fmt.Errorf("while getting DB connection: %w", err) + } + + row := db.QueryRowContext( + ctx, + ` + SELECT count(*) + FROM pg_subscription + WHERE subname = $1 + `, + obj.Spec.Name) + if row.Err() != nil { + return fmt.Errorf("while getting subscription status: %w", row.Err()) + } + + var count int + if err := row.Scan(&count); err != nil { + return fmt.Errorf("while getting subscription status (scan): %w", err) + } + + if count > 0 { + if err := r.patchSubscription(ctx, db, obj, connString); err != nil { + return fmt.Errorf("while patching subscription: %w", err) + } + return nil + } + + if err := r.createSubscription(ctx, db, obj, connString); err != nil { + return fmt.Errorf("while creating subscription: %w", err) + } + + return nil +} + +func (r *SubscriptionReconciler) patchSubscription( + ctx context.Context, + db *sql.DB, + obj *apiv1.Subscription, + connString string, +) error { + sqls := toSubscriptionAlterSQL(obj, connString) + for _, sqlQuery := range sqls { + if _, err := db.ExecContext(ctx, sqlQuery); err != nil { + return err + } + } + + return nil +} + +func (r *SubscriptionReconciler) createSubscription( + ctx context.Context, + db *sql.DB, + obj *apiv1.Subscription, + connString string, +) error { + sqls := toSubscriptionCreateSQL(obj, connString) + for _, sqlQuery := range sqls { + if _, err := db.ExecContext(ctx, sqlQuery); err != nil { + return err + } + } + + return nil +} + +func toSubscriptionCreateSQL(obj *apiv1.Subscription, connString string) []string { + result := make([]string, 0, 2) + + createQuery := fmt.Sprintf( + "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + pq.QuoteLiteral(connString), + pgx.Identifier{obj.Spec.PublicationName}.Sanitize(), + ) + if len(obj.Spec.Parameters) > 0 { + createQuery = fmt.Sprintf("%s WITH (%s)", createQuery, obj.Spec.Parameters) + } + result = append(result, createQuery) + + if len(obj.Spec.Owner) > 0 { + result = append(result, + fmt.Sprintf( + "ALTER SUBSCRIPTION %s OWNER TO %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + pgx.Identifier{obj.Spec.Owner}.Sanitize(), + ), + ) + } + + return result +} + +func toSubscriptionAlterSQL(obj *apiv1.Subscription, connString string) []string { + result := make([]string, 0, 4) + + setPublicationSQL := fmt.Sprintf( + "ALTER SUBSCRIPTION %s SET PUBLICATION %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + pgx.Identifier{obj.Spec.PublicationName}.Sanitize(), + ) + + setConnStringSQL := fmt.Sprintf( + "ALTER SUBSCRIPTION %s CONNECTION %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + pq.QuoteLiteral(connString), + ) + result = append(result, setPublicationSQL, setConnStringSQL) + + if len(obj.Spec.Owner) > 0 { + result = append(result, + fmt.Sprintf( + "ALTER SUBSCRIPTION %s OWNER TO %s", + pgx.Identifier{obj.Spec.Name}.Sanitize(), + pgx.Identifier{obj.Spec.Owner}.Sanitize(), + ), + ) + } + + if len(obj.Spec.Parameters) > 0 { + result = append(result, + fmt.Sprintf( + "ALTER SUBSCRIPTION %s SET (%s)", + result, + obj.Spec.Parameters, + ), + ) + } + + return result +} + +func (r *SubscriptionReconciler) dropSubscription(ctx context.Context, obj *apiv1.Subscription) error { + db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName) + if err != nil { + return fmt.Errorf("while getting DB connection: %w", err) + } + + if _, err := db.ExecContext( + ctx, + fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", pgx.Identifier{obj.Spec.Name}.Sanitize()), + ); err != nil { + return fmt.Errorf("while dropping subscription: %w", err) + } + + return nil +} diff --git a/tests/e2e/declarative_pub_sub_test.go b/tests/e2e/declarative_pub_sub_test.go new file mode 100644 index 0000000000..df8caf702f --- /dev/null +++ b/tests/e2e/declarative_pub_sub_test.go @@ -0,0 +1 @@ +package e2e