Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: declarative publications and subscriptions #115

Closed
Closed
Changes from 1 commit
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
a3b0fdb
fix: ensure former primary WALs are flushed before resyncing (#6141)
armru Nov 26, 2024
9444ebc
fix: remove spurious log line on walarchive failure (#6169)
mnencia Nov 26, 2024
9653936
fix(deps): update all non-major go dependencies (main) (#6131)
renovate[bot] Nov 27, 2024
b656983
chore(deps): update cloudnative-pg/ciclops action to v1.3.1 (main) (#…
renovate[bot] Nov 27, 2024
0617522
chore(deps): update dependency rook/rook to v1.15.6 (main) (#6158)
renovate[bot] Nov 27, 2024
e8fb1e7
chore(deps): update operator framework to v1.38.0 (main) (#6186)
renovate[bot] Nov 27, 2024
d08d8d9
feat: declarative publications and subscriptions
leonardoce Aug 20, 2024
fede41e
fix: instance getter usage
gabriele-wolfox Oct 10, 2024
44a3343
fix: manifests and linter
gabriele-wolfox Oct 10, 2024
7a99cc5
fix: config CRD
gabriele-wolfox Oct 10, 2024
a489540
chore: add CRD examples for OLM build
gabriele-wolfox Oct 10, 2024
b2762d5
chore: split SQL functions in separated files
gabriele-wolfox Oct 14, 2024
a5ec691
test: add basic e2e
gabriele-wolfox Oct 14, 2024
a7b91ce
test: add basic e2e
gabriele-wolfox Oct 14, 2024
0c322a7
fix: update CRDs
gabriele-wolfox Oct 14, 2024
d944b2c
fix: owner set, e2e and conflict after rebase
gabriele-wolfox Oct 15, 2024
90e531b
fix: e2e manifests and code bug
gabriele-wolfox Oct 15, 2024
979a935
test: remove owners from manifests, relying on default
gabriele-wolfox Oct 16, 2024
f333f02
test: add data inside table and assertion on replication
gabriele-wolfox Oct 16, 2024
03ba3a9
chore: simplify field type inside spec and add docs
gabriele-wolfox Oct 16, 2024
902b268
chore: fix scaffolding
NiccoloFei Oct 18, 2024
6a1d6c7
test: review E2E
NiccoloFei Oct 18, 2024
af18afe
test: more e2e fixes
NiccoloFei Oct 18, 2024
ea5acb0
test: patch finalizers before deletion
NiccoloFei Oct 18, 2024
8aec8bf
test(e2e): handle object notFound
NiccoloFei Oct 21, 2024
2b8921c
review: improve doc, fix sample
jsilvela Oct 21, 2024
4598b82
Update config/olm-samples/postgresql_v1_publication.yaml
jsilvela Oct 22, 2024
31d0c02
Update docs/src/samples/cluster-example-logical-source.yaml
jsilvela Oct 22, 2024
28edc80
chore: de-template text fixtures
jsilvela Oct 22, 2024
4178d9e
chore: improve operation order of the publication_controller
armru Oct 22, 2024
2f037fa
chore: add reconcilefinalizer and remove nestif exception
armru Oct 22, 2024
f754d97
fix: mark publication as failed if we encounter `Get` errors while ob…
armru Oct 22, 2024
3dbe2da
refactor: uniform failed behaviour between subscription and publications
armru Oct 22, 2024
d5e9347
refactor: uniform mark as ready logic
armru Oct 22, 2024
eeca4be
refactor: uniform getCluster logic, fix regression about intervals
armru Oct 22, 2024
8c996ef
refactor: centralize finalizer logic
armru Oct 23, 2024
67edec2
chore: improve getCluster
armru Oct 23, 2024
4747548
refactor: prepare sql methods to be unit tested
armru Oct 23, 2024
9be5a89
fix: make alterSql handle parameters correctly
armru Oct 23, 2024
fd3b6a5
fix: create publication sql
armru Oct 23, 2024
1b68c73
test(subscription): add basic coverage
armru Oct 23, 2024
01d3775
chore: fixing order and linting
NiccoloFei Oct 23, 2024
1095e3b
chore: review
mnencia Oct 29, 2024
5cf243c
chore: improve readability
mnencia Oct 30, 2024
9a9a5a9
chore: more fixes
mnencia Oct 30, 2024
424390a
test: review
mnencia Oct 30, 2024
7d46774
chore: quote identifier in parameters
mnencia Oct 30, 2024
8780705
chore: Remove the owner field
mnencia Oct 31, 2024
08264b6
chore: review
armru Oct 31, 2024
6133524
refactor: remove unknown status
armru Oct 31, 2024
58d0496
refactor: change `toPublicationCreateSQL` signature
armru Oct 31, 2024
eecc9de
revert: "refactor: remove unknown status"
mnencia Oct 31, 2024
54a37bb
fix: handle finalizers on cluster deletion
mnencia Nov 6, 2024
9562e96
refactor: finalizers
mnencia Nov 6, 2024
796c52f
docs: renamed page into Logical Replication
gbartolini Nov 25, 2024
3b140fa
docs: intro
gbartolini Nov 25, 2024
2148e7e
docs: publications
gbartolini Nov 26, 2024
984e788
docs: review publications
gbartolini Nov 26, 2024
4dea38d
docs: subscriptions
gbartolini Nov 26, 2024
a0d6ac8
docs: remove superuser requirement from examples
gbartolini Nov 27, 2024
838737a
docs: example
gbartolini Nov 27, 2024
89aec95
review
gbartolini Nov 27, 2024
a36bb57
docs: review publication API
gbartolini Nov 27, 2024
978a45e
docs: review subscription api
gbartolini Nov 27, 2024
7c08299
docs: add operator capability level and index
gbartolini Nov 27, 2024
56b7f0c
chore: docs review
mnencia Nov 27, 2024
be0d8a2
docs: minor fixes
NiccoloFei Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: e2e manifests and code bug
Signed-off-by: Gabriele Quaresima <[email protected]>
gabriele-wolfox authored and mnencia committed Nov 27, 2024
commit 90e531b5863bdc2e47cf7cd3eb17fb03ec6f6134
21 changes: 14 additions & 7 deletions internal/management/controller/publication_controller_sql.go
Original file line number Diff line number Diff line change
@@ -125,13 +125,16 @@ func toPublicationCreateSQL(obj *apiv1.Publication) []string {

func toPublicationAlterSQL(obj *apiv1.Publication) []string {
result := make([]string, 0, 3)
result = append(result,
fmt.Sprintf(
"ALTER PUBLICATION %s SET %s",
pgx.Identifier{obj.Spec.Name}.Sanitize(),
toPublicationTargetSQL(&obj.Spec.Target),
),
)

if len(obj.Spec.Target.Objects) > 0 {
result = append(result,
fmt.Sprintf(
"ALTER PUBLICATION %s SET %s",
pgx.Identifier{obj.Spec.Name}.Sanitize(),
toPublicationTargetObjectsSQL(&obj.Spec.Target),
),
)
}

if len(obj.Spec.Owner) > 0 {
result = append(result,
@@ -177,6 +180,10 @@ func toPublicationTargetSQL(obj *apiv1.PublicationTarget) string {
return "FOR ALL TABLES"
}

return toPublicationTargetObjectsSQL(obj)
}

func toPublicationTargetObjectsSQL(obj *apiv1.PublicationTarget) string {
result := ""
for _, object := range obj.Objects {
if len(result) > 0 {
15 changes: 13 additions & 2 deletions internal/management/controller/subscription_controller_sql.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"bytes"
"context"
"database/sql"
"fmt"
@@ -110,7 +111,7 @@ func toSubscriptionCreateSQL(obj *apiv1.Subscription, connString string) []strin
pgx.Identifier{obj.Spec.PublicationName}.Sanitize(),
)
if len(obj.Spec.Parameters) > 0 {
createQuery = fmt.Sprintf("%s WITH (%s)", createQuery, obj.Spec.Parameters)
createQuery = fmt.Sprintf("%s WITH (%s)", createQuery, toPostgresParameters(obj.Spec.Parameters))
}
result = append(result, createQuery)

@@ -127,6 +128,16 @@ func toSubscriptionCreateSQL(obj *apiv1.Subscription, connString string) []strin
return result
}

func toPostgresParameters(parameters map[string]string) string {
b := new(bytes.Buffer)
for key, value := range parameters {
_, _ = fmt.Fprintf(b, "%s = '%s', ", key, value)
}

// pruning last 2 chars `, `
return b.String()[:len(b.String())-2]
}

func toSubscriptionAlterSQL(obj *apiv1.Subscription, connString string) []string {
result := make([]string, 0, 4)

@@ -158,7 +169,7 @@ func toSubscriptionAlterSQL(obj *apiv1.Subscription, connString string) []string
fmt.Sprintf(
"ALTER SUBSCRIPTION %s SET (%s)",
result,
obj.Spec.Parameters,
toPostgresParameters(obj.Spec.Parameters),
),
)
}
35 changes: 35 additions & 0 deletions internal/management/controller/subscription_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright The CloudNativePG Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Conversion of PG parameters from map to string of key/value pairs", func() {
It("returns expected well-formed list", func() {
m := map[string]string{
"a": "1", "b": "2",
}
res := toPostgresParameters(m)
Expect(res).To(BeElementOf([]string{
`a = '1', b = '2'`,
`b = '2', a = '1'`,
}))
})
})
68 changes: 46 additions & 22 deletions tests/e2e/declarative_pub_sub_test.go
Original file line number Diff line number Diff line change
@@ -36,12 +36,13 @@ import (
var _ = Describe("Declarative publication and subscription test", Label(tests.LabelSmoke, tests.LabelBasic,
tests.LabelDeclarativePubSub), func() {
const (
sourceClusterManifest = fixturesDir + "/declarative_pub_sub/source-cluster.yaml.template"
destinationClusterManifest = fixturesDir + "/declarative_pub_sub/destination-cluster.yaml.template"
databaseManifest = fixturesDir + "/declarative_pub_sub/database.yaml.template"
pubManifest = fixturesDir + "/declarative_pub_sub/pub.yaml.template"
subManifest = fixturesDir + "/declarative_pub_sub/sub.yaml.template"
level = tests.Medium
sourceClusterManifest = fixturesDir + "/declarative_pub_sub/source-cluster.yaml.template"
destinationClusterManifest = fixturesDir + "/declarative_pub_sub/destination-cluster.yaml.template"
sourceDatabaseManifest = fixturesDir + "/declarative_pub_sub/source-database.yaml.template"
destinationDatabaseManifest = fixturesDir + "/declarative_pub_sub/destination-database.yaml.template"
pubManifest = fixturesDir + "/declarative_pub_sub/pub.yaml.template"
subManifest = fixturesDir + "/declarative_pub_sub/sub.yaml.template"
level = tests.Medium
)

BeforeEach(func() {
@@ -54,13 +55,11 @@ var _ = Describe("Declarative publication and subscription test", Label(tests.La
const (
namespacePrefix = "declarative-pub-sub"
dbname = "declarative"
pubName = "declarative-pub"
subName = "declarative-sub"
)
var (
sourceClusterName, destinationClusterName, namespace string
databaseObjectName, pubObjectName, subObjectName string
database *apiv1.Database
sourceDatabase, destinationDatabase *apiv1.Database
pub *apiv1.Publication
sub *apiv1.Subscription
err error
@@ -95,55 +94,80 @@ var _ = Describe("Declarative publication and subscription test", Label(tests.La
Namespace: namespace,
PodName: primaryPod,
},
"postgres",
dbname,
query)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(stdout).Should(ContainSubstring("1"), "expected publication not found")
}, 30).Should(Succeed())
}

assertSubscriptionExists := func(namespace, primaryPod string, sub *apiv1.Subscription) {
query := fmt.Sprintf("select count(*) from subscription where subname = '%s'",
query := fmt.Sprintf("select count(*) from pg_subscription where subname = '%s'",
sub.Spec.Name)
Eventually(func(g Gomega) {
stdout, _, err := env.ExecQueryInInstancePod(
utils.PodLocator{
Namespace: namespace,
PodName: primaryPod,
},
"postgres",
dbname,
query)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(stdout).Should(ContainSubstring("1"), "expected subscription not found")
}, 30).Should(Succeed())
}

It("can add a declarative database, publication and subscription", func() { //nolint:dupl
By("applying Database CRD manifest", func() {
CreateResourceFromFile(namespace, databaseManifest)
databaseObjectName, err = env.GetResourceNameFromYAML(databaseManifest)
It("can add declarative databases, publication and subscription", func() { //nolint:dupl
By("applying source Database CRD manifest", func() {
CreateResourceFromFile(namespace, sourceDatabaseManifest)
databaseObjectName, err = env.GetResourceNameFromYAML(sourceDatabaseManifest)
Expect(err).NotTo(HaveOccurred())
})
By("ensuring the Database CRD succeeded reconciliation", func() {
// get database object
database = &apiv1.Database{}
By("ensuring the source Database CRD succeeded reconciliation", func() {
// get source database object
sourceDatabase = &apiv1.Database{}
databaseNamespacedName := types.NamespacedName{
Namespace: namespace,
Name: databaseObjectName,
}

Eventually(func(g Gomega) {
err := env.Client.Get(env.Ctx, databaseNamespacedName, database)
err := env.Client.Get(env.Ctx, databaseNamespacedName, sourceDatabase)
Expect(err).ToNot(HaveOccurred())
g.Expect(database.Status.Ready).Should(BeTrue())
g.Expect(sourceDatabase.Status.Ready).Should(BeTrue())
}, 300).WithPolling(10 * time.Second).Should(Succeed())
})
By("verifying new database has been created", func() {
By("verifying source database has been created", func() {
primaryPodInfo, err := env.GetClusterPrimary(namespace, sourceClusterName)
Expect(err).ToNot(HaveOccurred())

AssertDatabaseExists(namespace, primaryPodInfo.Name, dbname, true)
})
By("applying destination Database CRD manifest", func() {
CreateResourceFromFile(namespace, destinationDatabaseManifest)
databaseObjectName, err = env.GetResourceNameFromYAML(destinationDatabaseManifest)
Expect(err).NotTo(HaveOccurred())
})
By("ensuring the destination Database CRD succeeded reconciliation", func() {
// get destination database object
destinationDatabase = &apiv1.Database{}
databaseNamespacedName := types.NamespacedName{
Namespace: namespace,
Name: databaseObjectName,
}

Eventually(func(g Gomega) {
err := env.Client.Get(env.Ctx, databaseNamespacedName, destinationDatabase)
Expect(err).ToNot(HaveOccurred())
g.Expect(destinationDatabase.Status.Ready).Should(BeTrue())
}, 300).WithPolling(10 * time.Second).Should(Succeed())
})
By("verifying destination database has been created", func() {
primaryPodInfo, err := env.GetClusterPrimary(namespace, destinationClusterName)
Expect(err).ToNot(HaveOccurred())

AssertDatabaseExists(namespace, primaryPodInfo.Name, dbname, true)
})
By("applying Publication CRD manifest", func() {
CreateResourceFromFile(namespace, pubManifest)
pubObjectName, err = env.GetResourceNameFromYAML(pubManifest)
Original file line number Diff line number Diff line change
@@ -11,6 +11,9 @@ spec:
user: app
dbname: declarative
port: "5432"
password:
name: source-cluster-app
key: password

postgresql:
parameters:
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: postgresql.cnpg.io/v1
kind: Database
metadata:
name: destination-db-declarative
spec:
name: declarative
owner: app
cluster:
name: destination-cluster
1 change: 1 addition & 0 deletions tests/e2e/fixtures/declarative_pub_sub/pub.yaml.template
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ metadata:
spec:
name: pub
dbname: declarative
owner: app
cluster:
name: source-cluster
target:
Original file line number Diff line number Diff line change
@@ -15,6 +15,16 @@ spec:
log_temp_files: '1024'
log_autovacuum_min_duration: '1s'
log_replication_commands: 'on'
pg_hba:
- hostssl replication app all scram-sha-256

managed:
roles:
- name: app
ensure: present
login: true
replication: true


# Example of rolling update strategy:
# - unsupervised: automated update of the primary once all
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: postgresql.cnpg.io/v1
kind: Database
metadata:
name: db-declarative
name: source-db-declarative
spec:
name: declarative
owner: app
2 changes: 1 addition & 1 deletion tests/e2e/fixtures/declarative_pub_sub/sub.yaml.template
Original file line number Diff line number Diff line change
@@ -5,8 +5,8 @@ metadata:
spec:
name: sub
dbname: declarative
owner: app
publicationName: pub
owner: app
cluster:
name: destination-cluster
externalClusterName: source-cluster