Skip to content

Commit

Permalink
chore: add EventuallyExecQueryInInstancePod for queries that should r…
Browse files Browse the repository at this point in the history
…etry on error

Signed-off-by: Niccolò Fei <[email protected]>
  • Loading branch information
NiccoloFei committed Oct 7, 2024
1 parent 317bf43 commit ffe6cc7
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 61 deletions.
63 changes: 46 additions & 17 deletions tests/e2e/asserts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func AssertCreateTestData(env *testsUtils.TestingEnvironment, tl TableLocator) {
}

// AssertCreateTestDataLargeObject create large objects with oid and data
func AssertCreateTestDataLargeObject(namespace, clusterName string, oid int, data string, pod *corev1.Pod) {
func AssertCreateTestDataLargeObject(namespace, clusterName string, oid int, data string) {
By("creating large object", func() {
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS image (name text,raster oid); "+
"INSERT INTO image (name, raster) VALUES ('beautiful image', lo_from_bytea(%d, '%s'));", oid, data)
Expand Down Expand Up @@ -572,7 +572,7 @@ func AssertDataExpectedCount(
}

// AssertLargeObjectValue verifies the presence of a Large Object given by its OID and data
func AssertLargeObjectValue(namespace, clusterName string, oid int, data string, pod *corev1.Pod) {
func AssertLargeObjectValue(namespace, clusterName string, oid int, data string) {
By("verifying large object", func() {
query := fmt.Sprintf("SELECT encode(lo_get(%v), 'escape');", oid)
Eventually(func() (string, error) {
Expand All @@ -598,16 +598,21 @@ func AssertLargeObjectValue(namespace, clusterName string, oid int, data string,

// AssertClusterStandbysAreStreaming verifies that all the standbys of a cluster have a wal-receiver running.
func AssertClusterStandbysAreStreaming(namespace string, clusterName string, timeout int32) {
query := "SELECT count(*) FROM pg_stat_wal_receiver"
Eventually(func() error {
standbyPods, err := env.GetClusterReplicas(namespace, clusterName)
if err != nil {
return err
}

for _, pod := range standbyPods.Items {
timeout := time.Second * 10
out, _, err := env.EventuallyExecCommand(env.Ctx, pod, specs.PostgresContainerName, &timeout,
"psql", "-U", "postgres", "-tAc", "SELECT count(*) FROM pg_stat_wal_receiver")
out, _, err := env.ExecQueryInInstancePod(
testsUtils.PodLocator{
Namespace: pod.Namespace,
PodName: pod.Name,
},
testsUtils.PostgresDBName,
query)
if err != nil {
return err
}
Expand Down Expand Up @@ -701,12 +706,18 @@ func AssertWritesResumedBeforeTimeout(namespace string, clusterName string, time
Name: podName,
}
var switchTime float64
commandTimeout := time.Second * 10
pod := &corev1.Pod{}
err := env.Client.Get(env.Ctx, namespacedName, pod)
Expect(err).ToNot(HaveOccurred())
out, _, err := env.EventuallyExecCommand(env.Ctx, *pod, specs.PostgresContainerName,
&commandTimeout, "psql", "-U", "postgres", "app", "-tAc", query)
out, _, err := env.EventuallyExecQueryInInstancePod(
testsUtils.PodLocator{
Namespace: pod.Namespace,
PodName: pod.Name,
}, testsUtils.AppDBName,
query,
RetryTimeout,
PollingTime,
)
Expect(err).ToNot(HaveOccurred())
switchTime, err = strconv.ParseFloat(strings.TrimSpace(out), 64)
if err != nil {
Expand Down Expand Up @@ -747,7 +758,6 @@ func AssertNewPrimary(namespace string, clusterName string, oldPrimary string) {
newPrimaryPod = newPrimary
})
By(fmt.Sprintf("verifying write operation on the new primary pod: %s", newPrimaryPod), func() {
commandTimeout := time.Second * 10
namespacedName := types.NamespacedName{
Namespace: namespace,
Name: newPrimaryPod,
Expand All @@ -757,8 +767,15 @@ func AssertNewPrimary(namespace string, clusterName string, oldPrimary string) {
Expect(err).ToNot(HaveOccurred())
// Expect write operation to succeed
query := "CREATE TABLE IF NOT EXISTS assert_new_primary(var1 text);"
_, _, err = env.EventuallyExecCommand(env.Ctx, pod, specs.PostgresContainerName,
&commandTimeout, "psql", "-U", "postgres", "app", "-tAc", query)
_, _, err = env.EventuallyExecQueryInInstancePod(
testsUtils.PodLocator{
Namespace: pod.Namespace,
PodName: pod.Name,
}, testsUtils.AppDBName,
query,
RetryTimeout,
PollingTime,
)
Expect(err).ToNot(HaveOccurred())
})
}
Expand Down Expand Up @@ -1008,7 +1025,6 @@ func AssertDetachReplicaModeCluster(
testTableName string,
) {
var primaryReplicaCluster *corev1.Pod
replicaCommandTimeout := time.Second * 10

var referenceTime time.Time
By("taking the reference time before the detaching", func() {
Expand Down Expand Up @@ -1055,8 +1071,13 @@ func AssertDetachReplicaModeCluster(
// Get primary from replica cluster
primaryReplicaCluster, err = env.GetClusterPrimary(namespace, replicaClusterName)
g.Expect(err).ToNot(HaveOccurred())
_, _, err = env.EventuallyExecCommand(env.Ctx, *primaryReplicaCluster, specs.PostgresContainerName,
&replicaCommandTimeout, "psql", "-U", "postgres", srcDatabaseName, "-tAc", query)
_, _, err = env.ExecQueryInInstancePod(
testsUtils.PodLocator{
Namespace: primaryReplicaCluster.Namespace,
PodName: primaryReplicaCluster.Name,
}, testsUtils.DatabaseName(srcDatabaseName),
query,
)
g.Expect(err).ToNot(HaveOccurred())
}, 300, 15).Should(Succeed())
})
Expand All @@ -1079,8 +1100,15 @@ func AssertDetachReplicaModeCluster(
})

By("verifying that replica cluster was not modified", func() {
outTables, stdErr, err := env.EventuallyExecCommand(env.Ctx, *primaryReplicaCluster, specs.PostgresContainerName,
&replicaCommandTimeout, "psql", "-U", "postgres", srcDatabaseName, "-tAc", "\\dt")
outTables, stdErr, err := env.EventuallyExecQueryInInstancePod(
testsUtils.PodLocator{
Namespace: primaryReplicaCluster.Namespace,
PodName: primaryReplicaCluster.Name,
}, testsUtils.DatabaseName(srcDatabaseName),
"\\dt",
RetryTimeout,
PollingTime,
)
if err != nil {
GinkgoWriter.Printf("stdout: %v\nstderr: %v\n", outTables, stdErr)
}
Expand Down Expand Up @@ -2678,7 +2706,8 @@ func DeleteTableUsingPgBouncerService(
AssertConnection(poolerService, appUser, "app", generatedAppUserPassword, pod, 180, env)

connectionTimeout := time.Second * 10
dsn := testsUtils.CreateDSN(poolerService, appUser, testsUtils.AppDBName, generatedAppUserPassword, testsUtils.Require, 5432)
dsn := testsUtils.CreateDSN(poolerService, appUser, testsUtils.AppDBName, generatedAppUserPassword,
testsUtils.Require, 5432)
_, _, err = env.EventuallyExecCommand(env.Ctx, *pod, specs.PostgresContainerName, &connectionTimeout,
"psql", dsn, "-tAc", "DROP TABLE table1")
Expect(err).ToNot(HaveOccurred())
Expand Down
6 changes: 2 additions & 4 deletions tests/e2e/cluster_microservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ var _ = Describe("Imports with Microservice Approach", Label(tests.LabelImportin
TableName: tableName,
}
AssertCreateTestData(env, tableLocator)
primaryPod, err := env.GetClusterPrimary(namespace, sourceClusterName)
Expect(err).ToNot(HaveOccurred())
AssertCreateTestDataLargeObject(namespace, sourceClusterName, oid, data, primaryPod)
AssertCreateTestDataLargeObject(namespace, sourceClusterName, oid, data)

importedClusterName = "cluster-pgdump-large-object"
cluster := AssertClusterImport(namespace, importedClusterName, sourceClusterName, "app")
Expand All @@ -89,7 +87,7 @@ var _ = Describe("Imports with Microservice Approach", Label(tests.LabelImportin
TableName: tableName,
}
AssertDataExpectedCount(env, tableLocator, 2)
AssertLargeObjectValue(namespace, importedClusterName, oid, data, primaryPod)
AssertLargeObjectValue(namespace, importedClusterName, oid, data)
By("deleting the imported database", func() {
Expect(testsUtils.DeleteObject(env, cluster)).To(Succeed())
})
Expand Down
14 changes: 10 additions & 4 deletions tests/e2e/configuration_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,6 @@ var _ = Describe("Configuration update with primaryUpdateMethod", Label(tests.La
})

By("verifying that old primary was actually restarted", func() {
commandTimeout := time.Second * 10
pod := corev1.Pod{}
err := env.Client.Get(env.Ctx, types.NamespacedName{
Namespace: namespace,
Expand All @@ -591,9 +590,16 @@ var _ = Describe("Configuration update with primaryUpdateMethod", Label(tests.La
Expect(err).ToNot(HaveOccurred())

// take pg postmaster start time
stdout, _, cmdErr := env.EventuallyExecCommand(env.Ctx, pod, specs.PostgresContainerName, &commandTimeout,
"psql", "-U", "postgres", "-tAc",
"select to_char(pg_postmaster_start_time(), 'YYYY-MM-DD HH24:MI:SS');")
query := "select to_char(pg_postmaster_start_time(), 'YYYY-MM-DD HH24:MI:SS');"
stdout, _, cmdErr := env.EventuallyExecQueryInInstancePod(
utils.PodLocator{
Namespace: pod.Namespace,
PodName: pod.Name,
}, utils.PostgresDBName,
query,
RetryTimeout,
PollingTime,
)
Expect(cmdErr).ToNot(HaveOccurred())

newStartTime, err := cnpgTypes.ParseTargetTime(nil, strings.Trim(stdout, "\n"))
Expand Down
66 changes: 48 additions & 18 deletions tests/e2e/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,15 @@ var _ = Describe("Failover", Label(tests.LabelSelfHealing), func() {

// Get the walreceiver pid
query := "SELECT pid FROM pg_stat_activity WHERE backend_type = 'walreceiver'"
out, _, err := env.EventuallyExecCommand(
env.Ctx, *pausedPod, specs.PostgresContainerName, &commandTimeout,
"psql", "-U", "postgres", "-tAc", query)
out, _, err := env.EventuallyExecQueryInInstancePod(
utils.PodLocator{
Namespace: pausedPod.Namespace,
PodName: pausedPod.Name,
}, utils.PostgresDBName,
query,
RetryTimeout,
PollingTime,
)
Expect(err).ToNot(HaveOccurred())
pid = strings.Trim(out, "\n")

Expand All @@ -94,9 +100,15 @@ var _ = Describe("Failover", Label(tests.LabelSelfHealing), func() {
// We don't want to wait for the replication timeout.
query = fmt.Sprintf("SELECT pg_terminate_backend(pid) FROM pg_stat_replication "+
"WHERE application_name = '%v'", pausedReplica)
_, _, err = env.EventuallyExecCommand(
env.Ctx, *primaryPod, specs.PostgresContainerName, &commandTimeout,
"psql", "-U", "postgres", "-tAc", query)
_, _, err = env.EventuallyExecQueryInInstancePod(
utils.PodLocator{
Namespace: primaryPod.Namespace,
PodName: primaryPod.Name,
}, utils.PostgresDBName,
query,
RetryTimeout,
PollingTime,
)
Expect(err).ToNot(HaveOccurred())

// Expect the primary to have lost connection with the stopped standby
Expand All @@ -114,28 +126,46 @@ var _ = Describe("Failover", Label(tests.LabelSelfHealing), func() {
Expect(err).ToNot(HaveOccurred())

// Gather the current WAL LSN
initialLSN, _, err := env.EventuallyExecCommand(
env.Ctx, *primaryPod, specs.PostgresContainerName, &commandTimeout,
"psql", "-U", "postgres", "-tAc", "SELECT pg_current_wal_lsn()")
initialLSN, _, err := env.EventuallyExecQueryInInstancePod(
utils.PodLocator{
Namespace: primaryPod.Namespace,
PodName: primaryPod.Name,
}, utils.PostgresDBName,
"SELECT pg_current_wal_lsn()",
RetryTimeout,
PollingTime,
)
Expect(err).ToNot(HaveOccurred())

// Execute a checkpoint
_, _, err = env.EventuallyExecCommand(
env.Ctx, *primaryPod, specs.PostgresContainerName, &commandTimeout,
"psql", "-U", "postgres", "-tAc", "CHECKPOINT")
_, _, err = env.EventuallyExecQueryInInstancePod(
utils.PodLocator{
Namespace: primaryPod.Namespace,
PodName: primaryPod.Name,
}, utils.PostgresDBName,
"CHECKPOINT",
RetryTimeout,
PollingTime,
)
Expect(err).ToNot(HaveOccurred())

query := fmt.Sprintf("SELECT true FROM pg_stat_replication "+
"WHERE application_name = '%v' AND replay_lsn > '%v'",
targetPrimary, strings.Trim(initialLSN, "\n"))
// The replay_lsn of the targetPrimary should be ahead
// of the one before the checkpoint
Eventually(func() (string, error) {
primaryPod, err = env.GetPod(namespace, currentPrimary)
Expect(err).ToNot(HaveOccurred())
query := fmt.Sprintf("SELECT true FROM pg_stat_replication "+
"WHERE application_name = '%v' AND replay_lsn > '%v'",
targetPrimary, strings.Trim(initialLSN, "\n"))
out, _, err := env.EventuallyExecCommand(
env.Ctx, *primaryPod, specs.PostgresContainerName, &commandTimeout,
"psql", "-U", "postgres", "-tAc", query)
out, _, err := env.EventuallyExecQueryInInstancePod(
utils.PodLocator{
Namespace: primaryPod.Namespace,
PodName: primaryPod.Name,
}, utils.PostgresDBName,
query,
RetryTimeout,
PollingTime,
)
return strings.TrimSpace(out), err
}, RetryTimeout).Should(BeEquivalentTo("t"))
})
Expand Down
13 changes: 9 additions & 4 deletions tests/e2e/tablespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ var _ = Describe("Tablespaces tests", Label(tests.LabelTablespaces,
tablespace2 = "tbs2"
table2 = "test_tbs2"
)
checkPointTimeout := time.Second * 10

BeforeAll(func() {
// Create a cluster in a namespace we'll delete after the test
Expand Down Expand Up @@ -437,9 +436,15 @@ var _ = Describe("Tablespaces tests", Label(tests.LabelTablespaces,
primaryPod, err := env.GetClusterPrimary(namespace, clusterName)
Expect(err).ToNot(HaveOccurred())
// Execute a checkpoint
_, _, err = env.EventuallyExecCommand(
env.Ctx, *primaryPod, specs.PostgresContainerName, &checkPointTimeout,
"psql", "-U", "postgres", "-tAc", "CHECKPOINT")
_, _, err = env.EventuallyExecQueryInInstancePod(
testUtils.PodLocator{
Namespace: primaryPod.Namespace,
PodName: primaryPod.Name,
}, testUtils.PostgresDBName,
"CHECKPOINT",
RetryTimeout,
PollingTime,
)
Expect(err).ToNot(HaveOccurred())
})

Expand Down
25 changes: 18 additions & 7 deletions tests/e2e/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/pkg/specs"
"github.com/cloudnative-pg/cloudnative-pg/tests"
testsUtils "github.com/cloudnative-pg/cloudnative-pg/tests/utils"

Expand Down Expand Up @@ -246,10 +245,16 @@ var _ = Describe("Upgrade", Label(tests.LabelUpgrade, tests.LabelNoOpenshift), O
primary, err := env.GetClusterPrimary(upgradeNamespace, clusterName)
Expect(err).ToNot(HaveOccurred())

commandTimeout := time.Second * 10
query := "CREATE TABLE IF NOT EXISTS postswitch(i int);"
_, _, err = env.EventuallyExecCommand(env.Ctx, *primary, specs.PostgresContainerName, &commandTimeout,
"psql", "-U", "postgres", databaseName, "-tAc", query)
_, _, err = env.EventuallyExecQueryInInstancePod(
testsUtils.PodLocator{
Namespace: primary.Namespace,
PodName: primary.Name,
}, testsUtils.DatabaseName(databaseName),
query,
RetryTimeout,
PollingTime,
)
Expect(err).ToNot(HaveOccurred())

for i := 1; i < 4; i++ {
Expand Down Expand Up @@ -505,10 +510,16 @@ var _ = Describe("Upgrade", Label(tests.LabelUpgrade, tests.LabelNoOpenshift), O
primary, err := env.GetClusterPrimary(upgradeNamespace, clusterName1)
Expect(err).ToNot(HaveOccurred())

commandTimeout := time.Second * 10
query := "CREATE TABLE IF NOT EXISTS to_restore AS VALUES (1),(2);"
_, _, err = env.EventuallyExecCommand(env.Ctx, *primary, specs.PostgresContainerName, &commandTimeout,
"psql", "-U", "postgres", databaseName, "-tAc", query)
_, _, err = env.EventuallyExecQueryInInstancePod(
testsUtils.PodLocator{
Namespace: primary.Namespace,
PodName: primary.Name,
}, testsUtils.DatabaseName(databaseName),
query,
RetryTimeout,
PollingTime,
)
Expect(err).ToNot(HaveOccurred())
})

Expand Down
Loading

0 comments on commit ffe6cc7

Please sign in to comment.