Skip to content

Commit

Permalink
Allow upgrading Postgres with timescaleDB extension. (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 authored Feb 6, 2024
1 parent 67ce41f commit 1e01830
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 11 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ Probably, it does not make sense to use this project with large databases. Howev
## Supported Databases

| Database | Image | Status | Upgrade Support |
|-------------|--------------|:------:|:---------------:|
| postgres | >= 12-alpine | beta ||
| rethinkdb | >= 2.4.0 | beta ||
| ETCD | >= 3.5 | alpha ||
| meilisearch | >= 1.2.0 | alpha ||
| redis | >= 6.0 | alpha ||
| keydb | >= 6.0 | alpha ||
| ----------- | ------------ | :----: | :-------------: |
| postgres | >= 12-alpine | beta ||
| rethinkdb | >= 2.4.0 | beta ||
| ETCD | >= 3.5 | alpha ||
| meilisearch | >= 1.2.0 | alpha ||
| redis | >= 6.0 | alpha ||
| keydb | >= 6.0 | alpha ||

Postgres also supports updates when using the TimescaleDB extension. Please consider the integration test for supported upgrade paths.

## Database Upgrades

Expand Down
83 changes: 82 additions & 1 deletion cmd/internal/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/utils"
"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"go.uber.org/zap"

_ "github.com/lib/pq"
)

const (
Expand Down Expand Up @@ -153,7 +155,7 @@ func (db *Postgres) Recover(ctx context.Context) error {
func (db *Postgres) Probe(ctx context.Context) error {
// TODO is postgres db OK ?
connString := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable", db.host, db.port, db.user, db.password)
var err error

dbc, err := sql.Open("postgres", connString)
if err != nil {
return fmt.Errorf("unable to open postgres connection %w", err)
Expand All @@ -164,5 +166,84 @@ func (db *Postgres) Probe(ctx context.Context) error {
if err != nil {
return fmt.Errorf("unable to ping postgres connection %w", err)
}

runsTimescaleDB, err := db.runningTimescaleDB(ctx, postgresConfigCmd)
if err == nil && runsTimescaleDB {
db.log.Infow("detected running timescaledb, running post-start hook to update timescaledb extension if necessary")

err = db.updateTimescaleDB(ctx, dbc)
if err != nil {
return fmt.Errorf("unable to update timescaledb: %w", err)
}
}

return nil
}

func (db *Postgres) updateTimescaleDB(ctx context.Context, dbc *sql.DB) error {
var (
databaseNames []string
)

databaseNameRows, err := dbc.QueryContext(ctx, "SELECT datname,datallowconn FROM pg_database")
if err != nil {
return fmt.Errorf("unable to get database names: %w", err)
}
defer databaseNameRows.Close()

for databaseNameRows.Next() {
var name string
var allowed bool
if err := databaseNameRows.Scan(&name, &allowed); err != nil {
return err
}

if allowed {
databaseNames = append(databaseNames, name)
}
}
if err := databaseNameRows.Err(); err != nil {
return err
}

for _, dbName := range databaseNames {
connString := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", db.host, db.port, db.user, db.password, dbName)
dbc2, err := sql.Open("postgres", connString)
if err != nil {
return fmt.Errorf("unable to open postgres connection %w", err)
}
defer dbc2.Close()

rows, err := dbc2.QueryContext(ctx, "SELECT extname FROM pg_extension")
if err != nil {
return fmt.Errorf("unable to get extensions: %w", err)
}
defer rows.Close()

for rows.Next() {
var extName string
if err := rows.Scan(&extName); err != nil {
return err
}

if extName != "timescaledb" {
continue
}

db.log.Infow("updating timescaledb extension", "db-name", dbName)

_, err = dbc2.ExecContext(ctx, "ALTER EXTENSION timescaledb UPDATE")
if err != nil {
return fmt.Errorf("unable to update extension: %w", err)
}

break
}

if err := rows.Err(); err != nil {
return err
}
}

return nil
}
52 changes: 51 additions & 1 deletion cmd/internal/database/postgres/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,24 @@ func (db *Postgres) Upgrade(ctx context.Context) error {
if err != nil {
return err
}
gid, err := strconv.Atoi(pgUser.Gid)
if err != nil {
return err
}

// remove /data/postgres-new if present
newDataDirTemp := path.Join("/data", "postgres-new")
newDataDirTemp := path.Join("/data", "postgres-new") // TODO: /data should not be hardcoded
err = os.RemoveAll(newDataDirTemp)
if err != nil {
db.log.Errorw("unable to remove new datadir, skipping upgrade", "error", err)
return nil
}

err = os.Chown("/data", uid, gid)
if err != nil {
return err
}

// initdb -D /data/postgres-new
cmd := exec.Command(postgresInitDBCmd, "-D", newDataDirTemp)
cmd.Stdout = os.Stdout
Expand Down Expand Up @@ -171,6 +180,24 @@ func (db *Postgres) Upgrade(ctx context.Context) error {
"--new-bindir", newPostgresBinDir,
"--link",
}

runsTimescaleDB, err := db.runningTimescaleDB(ctx, postgresConfigCmd)
if err != nil {
return err
}

if runsTimescaleDB {
// see https://github.com/timescale/timescaledb/issues/1844 and https://github.com/timescale/timescaledb/issues/4503#issuecomment-1860883843
db.log.Infow("running timescaledb, applying custom options for upgrade command")

// timescaledb libraries in this container are only compatible with the current postgres version
// do not load them anymore with the old postgresql server
pgUpgradeArgs = append(pgUpgradeArgs,
"--old-options", "-c shared_preload_libraries=''",
"--new-options", "-c timescaledb.restoring=on -c shared_preload_libraries=timescaledb",
)
}

cmd = exec.CommandContext(ctx, postgresUpgradeCmd, pgUpgradeArgs...) //nolint:gosec
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
Expand Down Expand Up @@ -255,6 +282,29 @@ func (db *Postgres) getBinDir(ctx context.Context, pgConfigCmd string) (string,
return strings.TrimSpace(string(out)), nil
}

func (db *Postgres) runningTimescaleDB(ctx context.Context, pgConfigCmd string) (bool, error) {
libDir, err := db.getLibDir(ctx, pgConfigCmd)
if err != nil {
return false, err
}

if _, err := os.Stat(path.Join(libDir, "timescaledb.so")); err == nil {
return true, nil
}

return false, nil
}

func (db *Postgres) getLibDir(ctx context.Context, pgConfigCmd string) (string, error) {
cmd := exec.CommandContext(ctx, pgConfigCmd, "--pkglibdir")
out, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("unable to figure out lib dir: %w", err)
}

return strings.TrimSpace(string(out)), nil
}

// copyPostgresBinaries is needed to save old postgres binaries for a later major upgrade
func (db *Postgres) copyPostgresBinaries(ctx context.Context, override bool) error {
binDir, err := db.getBinDir(ctx, postgresConfigCmd)
Expand Down
30 changes: 28 additions & 2 deletions cmd/internal/utils/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/exec"
"strings"
"time"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -52,7 +53,7 @@ func (c *CmdExecutor) ExecWithStreamingOutput(ctx context.Context, command strin

parts := strings.Fields(command)

cmd := exec.CommandContext(ctx, parts[0], parts[1:]...) // nolint:gosec
cmd := exec.Command(parts[0], parts[1:]...) // nolint:gosec

c.log.Debugw("running command", "command", cmd.Path, "args", cmd.Args)

Expand All @@ -61,5 +62,30 @@ func (c *CmdExecutor) ExecWithStreamingOutput(ctx context.Context, command strin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stdout

return cmd.Run()
err := cmd.Start()
if err != nil {
return err
}

go func() {
<-ctx.Done()

go func() {
time.Sleep(10 * time.Second)

c.log.Infow("force killing post-exec command now")
if err := cmd.Process.Signal(os.Kill); err != nil {
panic(err)
}
}()

c.log.Infow("sending sigint to post-exec command process")

err := cmd.Process.Signal(os.Interrupt)
if err != nil {
c.log.Errorw("unable to send interrupt to post-exec command", "error", err)
}
}()

return cmd.Wait()
}
104 changes: 104 additions & 0 deletions integration/postgres_timescaledb_upgrade_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//go:build integration

package integration_test

import (
"context"
"testing"

"github.com/metal-stack/backup-restore-sidecar/pkg/generate/examples/examples"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

_ "github.com/lib/pq"
)

func Test_Postgres_TimescaleDB_Upgrade(t *testing.T) {
backingResources := examples.PostgresBackingResources(namespaceName(t))

modified := false

for _, r := range backingResources {
cm, ok := r.(*corev1.ConfigMap)
if !ok {
continue
}

if cm.Name != "backup-restore-sidecar-config-postgres" {
continue
}

cm.Data = map[string]string{
"config.yaml": `---
bind-addr: 0.0.0.0
db: postgres
db-data-directory: /data/postgres/
backup-provider: local
backup-cron-schedule: "*/1 * * * *"
object-prefix: postgres-test
compression-method: tar
post-exec-cmds:
- docker-entrypoint.sh postgres -c shared_preload_libraries=timescaledb
`}

modified = true
break
}

require.True(t, modified)

upgradeFlow(t, &upgradeFlowSpec{
flowSpec: flowSpec{
databaseType: examples.Postgres,
sts: examples.PostgresSts,
backingResources: func(namespace string) []client.Object {
return backingResources
},
addTestData: addTimescaleDbTestData,
verifyTestData: verifyPostgresTestData,
},
databaseImages: []string{
"timescale/timescaledb:2.11.2-pg12",
"timescale/timescaledb:2.11.2-pg15",
// it is allowed to skip a minor version
// "timescale/timescaledb:2.12.2-pg15",
"timescale/timescaledb:2.13.1-pg15",
"timescale/timescaledb:2.13.1-pg16",
},
})
}

func addTimescaleDbTestData(t *testing.T, ctx context.Context) {
db := newPostgresSession(t, ctx)
defer db.Close()

var (
createStmt = `
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE TABLE IF NOT EXISTS backuprestore (
timestamp timestamp NOT NULL,
data text NOT NULL,
PRIMARY KEY(timestamp, data)
);
SELECT create_hypertable('backuprestore', 'timestamp', chunk_time_interval => INTERVAL '1 days', if_not_exists => TRUE);
ALTER TABLE backuprestore SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'data',
timescaledb.compress_orderby = 'timestamp'
);
SELECT add_compression_policy('backuprestore', INTERVAL '1 days');
`
insertStmt = `INSERT INTO backuprestore("timestamp", "data") VALUES ('2024-01-01 12:00:00.000', 'I am precious');`
)

_, err := db.Exec(createStmt)
require.NoError(t, err)

_, err = db.Exec(insertStmt)
require.NoError(t, err)
}

0 comments on commit 1e01830

Please sign in to comment.