Skip to content

Commit

Permalink
chore: split SQL functions in separated files
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Quaresima <[email protected]>
  • Loading branch information
gabriele-wolfox committed Oct 14, 2024
1 parent a0e8f4c commit c0cfeb3
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 343 deletions.
2 changes: 1 addition & 1 deletion config/olm-samples/postgresql_v1_subscription.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ spec:
publicationName: pub
cluster:
name: cluster-example-dest
externalClusterName: cluster-example
externalClusterName: cluster-example
182 changes: 3 additions & 179 deletions internal/management/controller/publication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ package controller

import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"

"github.com/cloudnative-pg/machinery/pkg/log"
"github.com/jackc/pgx/v5"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -165,7 +162,7 @@ func (r *PublicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
)
}

return r.succeededRenconciliation(
return r.succeededReconciliation(
ctx,
&publication,
)
Expand Down Expand Up @@ -201,8 +198,7 @@ func (r *PublicationReconciler) failedReconciliation(

var statusError *instance.StatusError
if errors.As(err, &statusError) {
// The body line of the instance manager contain the human
// readable error
// The body line of the instance manager contain the human-readable error
publication.Status.Error = statusError.Body
}

Expand All @@ -216,7 +212,7 @@ func (r *PublicationReconciler) failedReconciliation(
}

// succeededReconciliation marks the reconciliation as succeeded
func (r *PublicationReconciler) succeededRenconciliation(
func (r *PublicationReconciler) succeededReconciliation(
ctx context.Context,
publication *apiv1.Publication,
) (ctrl.Result, error) {
Expand Down Expand Up @@ -249,175 +245,3 @@ func (r *PublicationReconciler) GetCluster(ctx context.Context) (*apiv1.Cluster,

return &cluster, nil
}

func (r *PublicationReconciler) alignPublication(ctx context.Context, obj *apiv1.Publication) error {
db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName)
if err != nil {
return fmt.Errorf("while getting DB connection: %w", err)
}

row := db.QueryRowContext(
ctx,
`
SELECT count(*)
FROM pg_publication
WHERE pubname = $1
`,
obj.Spec.Name)
if row.Err() != nil {
return fmt.Errorf("while getting publication status: %w", row.Err())
}

var count int
if err := row.Scan(&count); err != nil {
return fmt.Errorf("while getting publication status (scan): %w", err)
}

if count > 0 {
if err := r.patchPublication(ctx, db, obj); err != nil {
return fmt.Errorf("while patching publication: %w", err)
}
return nil
}

if err := r.createPublication(ctx, db, obj); err != nil {
return fmt.Errorf("while creating publication: %w", err)
}

return nil
}

func (r *PublicationReconciler) patchPublication(
ctx context.Context,
db *sql.DB,
obj *apiv1.Publication,
) error {
sqls := toPublicationAlterSQL(obj)
for _, sqlQuery := range sqls {
if _, err := db.ExecContext(ctx, sqlQuery); err != nil {
return err
}
}

return nil
}

func (r *PublicationReconciler) createPublication(
ctx context.Context,
db *sql.DB,
obj *apiv1.Publication,
) error {
sqls := toPublicationCreateSQL(obj)
for _, sqlQuery := range sqls {
if _, err := db.ExecContext(ctx, sqlQuery); err != nil {
return err
}
}
return nil
}

func toPublicationCreateSQL(obj *apiv1.Publication) []string {
result := make([]string, 0, 2)

result = append(result,
fmt.Sprintf(
"CREATE PUBLICATION %s %s",
pgx.Identifier{obj.Spec.Name}.Sanitize(),
toPublicationTargetSQL(&obj.Spec.Target),
),
)

if len(obj.Spec.Owner) > 0 {
result = append(result,
fmt.Sprintf(
"ALTER PUBLICATION %s OWNER to %s",
pgx.Identifier{obj.Spec.Name}.Sanitize(),
toPublicationTargetSQL(&obj.Spec.Target),
),
)
}

if len(obj.Spec.Parameters) > 0 {
result = append(result,
fmt.Sprintf("%s WITH (%s)", result, obj.Spec.Parameters),
)
}

return result
}

func toPublicationAlterSQL(obj *apiv1.Publication) []string {
result := make([]string, 0, 3)
result = append(result,
fmt.Sprintf(
"ALTER PUBLICATION %s SET %s",
pgx.Identifier{obj.Spec.Name}.Sanitize(),
toPublicationTargetSQL(&obj.Spec.Target),
),
)

if len(obj.Spec.Owner) > 0 {
result = append(result,
fmt.Sprintf(
"ALTER PUBLICATION %s OWNER TO %s",
pgx.Identifier{obj.Spec.Name}.Sanitize(),
pgx.Identifier{obj.Spec.Owner}.Sanitize(),
),
)
}

if len(obj.Spec.Parameters) > 0 {
result = append(result,
fmt.Sprintf(
"ALTER PUBLICATION %s SET (%s)",
result,
obj.Spec.Parameters,
),
)
}

return result
}

func (r *PublicationReconciler) dropPublication(ctx context.Context, obj *apiv1.Publication) error {
db, err := r.instance.ConnectionPool().Connection(obj.Spec.DBName)
if err != nil {
return fmt.Errorf("while getting DB connection: %w", err)
}

if _, err := db.ExecContext(
ctx,
fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pgx.Identifier{obj.Spec.Name}.Sanitize()),
); err != nil {
return fmt.Errorf("while dropping publication: %w", err)
}

return nil
}

func toPublicationTargetSQL(obj *apiv1.PublicationTarget) string {
if obj.AllTables != nil {
return "FOR ALL TABLES"
}

result := ""
for _, object := range obj.Objects {
if len(result) > 0 {
result += ", "
}
result += toPublicationObjectSQL(&object)
}

if len(result) > 0 {
result = fmt.Sprintf("FOR %s", result)
}
return result
}

func toPublicationObjectSQL(obj *apiv1.PublicationTargetObject) string {
if len(obj.Schema) > 0 {
return fmt.Sprintf("TABLES IN SCHEMA %s", pgx.Identifier{obj.Schema}.Sanitize())
}

return fmt.Sprintf("TABLE %s", strings.Join(obj.TableExpression, ", "))
}
Loading

0 comments on commit c0cfeb3

Please sign in to comment.