From fb212257b92d3d8f6c6e9fa867abbfe636eb29a6 Mon Sep 17 00:00:00 2001 From: Leonardo Cecchi Date: Thu, 15 Feb 2024 12:17:33 +0100 Subject: [PATCH] feat: implementation of the Backup service (#7) Signed-off-by: Leonardo Cecchi Signed-off-by: Armando Ruocco Co-authored-by: Armando Ruocco --- Dockerfile | 5 + go.mod | 19 +- go.sum | 38 +++- internal/backup/backup.go | 94 ++++++++ internal/backup/doc.go | 2 + internal/backup/executor/constants.go | 3 + internal/backup/executor/controldata.go | 82 +++++++ internal/backup/executor/doc.go | 2 + internal/backup/executor/executor.go | 275 ++++++++++++++++++++++++ internal/backup/executor/repository.go | 157 ++++++++++++++ internal/backup/storage/doc.go | 3 + internal/backup/storage/storage.go | 65 ++++++ internal/fileutils/checks.go | 21 ++ internal/identity/impl.go | 7 + internal/operator/mutations.go | 7 +- internal/operator/spec.go | 38 +++- internal/operator/validation.go | 20 +- internal/wal/status.go | 19 +- internal/wal/utils.go | 42 ---- internal/wal/wal.go | 34 ++- kubernetes/backup-example.yaml | 11 + kubernetes/cluster-example.yaml | 22 +- main.go | 3 + pkg/pluginhelper/helper.go | 68 +++--- pkg/pluginhelper/server.go | 2 +- 25 files changed, 915 insertions(+), 124 deletions(-) create mode 100644 internal/backup/backup.go create mode 100644 internal/backup/doc.go create mode 100644 internal/backup/executor/constants.go create mode 100644 internal/backup/executor/controldata.go create mode 100644 internal/backup/executor/doc.go create mode 100644 internal/backup/executor/executor.go create mode 100644 internal/backup/executor/repository.go create mode 100644 internal/backup/storage/doc.go create mode 100644 internal/backup/storage/storage.go create mode 100644 internal/fileutils/checks.go delete mode 100644 internal/wal/utils.go create mode 100644 kubernetes/backup-example.yaml diff --git a/Dockerfile b/Dockerfile index d9a0fbc..e8c4d2d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,6 +12,11 @@ RUN --mount=type=cache,target=/root/.cache/go-build ./scripts/build.sh # Step 2: build the image to be actually run FROM alpine:3.18.4 +RUN mkdir /tmp/kopia-dist && \ + wget -o /tmp/kopia-dist/kopia-0.15.0-linux-x64.tar.gz https://github.com/kopia/kopia/releases/download/v0.15.0/kopia-0.15.0-linux-x64.tar.gz && \ + tar -C /tmp/kopia-dist -xvzf kopia-0.15.0-linux-x64.tar.gz && \ + cp /tmp/kopia-dist/kopia-0.15.0-linux-x64/kopia /usr/bin/kopia && \ + rm -rf /tmp/kopia-dist USER 10001:10001 COPY --from=builder /app/bin/plugin-pvc-backup /app/bin/plugin-pvc-backup ENTRYPOINT ["/app/bin/plugin-pvc-backup"] \ No newline at end of file diff --git a/go.mod b/go.mod index 5c3d6ea..3442548 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.21.6 require ( github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8 - github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426 + github.com/cloudnative-pg/cnpg-i v0.0.0-20240202130713-14050b29b7a2 github.com/evanphx/json-patch/v5 v5.8.1 github.com/go-logr/logr v1.3.0 github.com/go-logr/zapr v1.2.4 @@ -16,10 +16,13 @@ require ( go.uber.org/zap v1.26.0 google.golang.org/grpc v1.60.1 k8s.io/api v0.28.4 + k8s.io/apimachinery v0.28.4 + k8s.io/client-go v0.28.4 ) require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/blang/semver v3.5.1+incompatible // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect @@ -33,17 +36,24 @@ require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 // indirect github.com/google/uuid v1.5.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.5.1 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kubernetes-csi/external-snapshotter/client/v6 v6.3.0 // indirect github.com/lib/pq v1.10.9 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -54,7 +64,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.70.0 // indirect github.com/prometheus/client_golang v1.17.0 // indirect - github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect + github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect github.com/robfig/cron v1.2.0 // indirect @@ -63,10 +73,13 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.4.1 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/crypto v0.18.0 // indirect golang.org/x/exp v0.0.0-20231219160207-73b9e39aefca // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect + golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/term v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect @@ -80,8 +93,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.28.4 // indirect - k8s.io/apimachinery v0.28.4 // indirect - k8s.io/client-go v0.28.4 // indirect k8s.io/component-base v0.28.4 // indirect k8s.io/klog/v2 v2.110.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect diff --git a/go.sum b/go.sum index 256fbb3..55989db 100644 --- a/go.sum +++ b/go.sum @@ -38,11 +38,15 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.5.1 h1:FK6RCIUSfmbnI/imIICmboyQBkOckutaa6R5YYlLZyo= +github.com/DATA-DOG/go-sqlmock v1.5.1/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ= +github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -52,10 +56,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8 h1://2N4Zt2HER9w9l/HMbJheplJTNBgh8FlZ1mSrC2DJQ= github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8/go.mod h1:r6blheO2ihiuqKbk6rqPN5//PPJnYtKCGT2OxpXtk2o= -github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f h1:ypwPq45y8ezzwxUTHL0VkzkT2+pcHnE4yRoeGTP8fp8= -github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f/go.mod h1:0G5GXQVj09KvONIcYURyroL74zOFGjv4eI5OXz7/G/0= -github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426 h1:eW94u+AQoFR+KDyIenekcHWCE6Kc48mo8CgGB+VOzKU= -github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426/go.mod h1:0G5GXQVj09KvONIcYURyroL74zOFGjv4eI5OXz7/G/0= +github.com/cloudnative-pg/cnpg-i v0.0.0-20240202130713-14050b29b7a2 h1:7Cow1BF5rM3k7q+QYjJsPiYGZTK8w+uTGa4VZ3IbBpk= +github.com/cloudnative-pg/cnpg-i v0.0.0-20240202130713-14050b29b7a2/go.mod h1:0G5GXQVj09KvONIcYURyroL74zOFGjv4eI5OXz7/G/0= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -162,8 +164,8 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 h1:hR7/MlvK23p6+lIw9SN1TigNLn9ZnF3W4SYRKq2gAHs= +github.com/google/pprof v0.0.0-20230602150820-91b7bce49751/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= @@ -184,12 +186,22 @@ github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI= +github.com/jackc/pgx/v5 v5.5.1/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -211,6 +223,8 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= +github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= @@ -241,8 +255,8 @@ github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.70.0/g github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM= -github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= @@ -280,6 +294,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= +github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= +github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -293,6 +309,8 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= @@ -310,6 +328,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -405,6 +425,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/backup/backup.go b/internal/backup/backup.go new file mode 100644 index 0000000..6046d88 --- /dev/null +++ b/internal/backup/backup.go @@ -0,0 +1,94 @@ +package backup + +import ( + "context" + "time" + + "github.com/cloudnative-pg/cnpg-i/pkg/backup" + + "github.com/cloudnative-pg/plugin-pvc-backup/internal/backup/executor" + "github.com/cloudnative-pg/plugin-pvc-backup/internal/backup/storage" + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging" + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/metadata" + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/pluginhelper" +) + +// Implementation is the implementation of the identity service +type Implementation struct { + backup.BackupServer +} + +// GetCapabilities gets the capabilities of the Backup service +func (Implementation) GetCapabilities( + context.Context, + *backup.BackupCapabilitiesRequest, +) (*backup.BackupCapabilitiesResult, error) { + return &backup.BackupCapabilitiesResult{ + Capabilities: []*backup.BackupCapability{ + { + Type: &backup.BackupCapability_Rpc{ + Rpc: &backup.BackupCapability_RPC{ + Type: backup.BackupCapability_RPC_TYPE_BACKUP, + }, + }, + }, + }, + }, nil +} + +// Backup take a physical backup using Kopia +func (Implementation) Backup( + ctx context.Context, + request *backup.BackupRequest, +) (*backup.BackupResult, error) { + contextLogger := logging.FromContext(ctx) + + helper, err := pluginhelper.NewDataBuilder(metadata.Data.Name, request.ClusterDefinition).Build() + if err != nil { + contextLogger.Error(err, "Error while decoding cluster definition from CNPG") + return nil, err + } + + backupObject, err := helper.DecodeBackup(request.BackupDefinition) + if err != nil { + contextLogger.Error(err, "Error while decoding backup definition from CNPG") + return nil, err + } + + cluster := helper.GetCluster() + rep, err := executor.NewRepository( + ctx, + storage.GetBasePath(cluster.Name), + storage.GetKopiaConfigFilePath(cluster.Name), + storage.GetKopiaCacheDirectory(cluster.Name), + ) + if err != nil { + return nil, err + } + + exec := executor.NewLocalExecutor( + cluster, + backupObject, + rep, + ) + + startedAt := time.Now() + backupInfo, err := exec.TakeBackup(ctx) + if err != nil { + return nil, err + } + + return &backup.BackupResult{ + BackupId: backupInfo.BackupName, + BackupName: backupInfo.BackupName, + StartedAt: startedAt.Unix(), + StoppedAt: time.Now().Unix(), + BeginWal: exec.GetBeginWal(), + EndWal: exec.GetEndWal(), + BeginLsn: string(backupInfo.BeginLSN), + EndLsn: string(backupInfo.EndLSN), + BackupLabelFile: backupInfo.LabelFile, + TablespaceMapFile: backupInfo.SpcmapFile, + Online: true, + }, nil +} diff --git a/internal/backup/doc.go b/internal/backup/doc.go new file mode 100644 index 0000000..9dd234b --- /dev/null +++ b/internal/backup/doc.go @@ -0,0 +1,2 @@ +// Package backup contain the implementation of the Backup service +package backup diff --git a/internal/backup/executor/constants.go b/internal/backup/executor/constants.go new file mode 100644 index 0000000..d6db917 --- /dev/null +++ b/internal/backup/executor/constants.go @@ -0,0 +1,3 @@ +package executor + +const podIP = "127.0.0.1" diff --git a/internal/backup/executor/controldata.go b/internal/backup/executor/controldata.go new file mode 100644 index 0000000..3cd5289 --- /dev/null +++ b/internal/backup/executor/controldata.go @@ -0,0 +1,82 @@ +package executor + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "time" + + "github.com/cloudnative-pg/cloudnative-pg/pkg/management/url" + "github.com/cloudnative-pg/cloudnative-pg/pkg/utils" + + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging" +) + +// getPgControlData obtains the pg_controldata from the instance HTTP endpoint +func getPgControlData( + ctx context.Context, +) (map[string]string, error) { + contextLogger := logging.FromContext(ctx) + + const ( + connectionTimeout = 2 * time.Second + requestTimeout = 30 * time.Second + ) + + // We want a connection timeout to prevent waiting for the default + // TCP connection timeout (30 seconds) on lost SYN packets + timeoutClient := &http.Client{ + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: connectionTimeout, + }).DialContext, + }, + Timeout: requestTimeout, + } + + httpURL := url.Build(podIP, url.PathPGControlData, url.StatusPort) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, httpURL, nil) + if err != nil { + return nil, err + } + + resp, err := timeoutClient.Do(req) + if err != nil { + return nil, err + } + + defer func() { + if err := resp.Body.Close(); err != nil { + contextLogger.Error(err, "while closing body") + } + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + contextLogger.Info("Error while querying the pg_controldata endpoint", + "statusCode", resp.StatusCode, + "body", string(body)) + return nil, fmt.Errorf("error while querying the pg_controldata endpoint: %d", resp.StatusCode) + } + + type pgControldataResponse struct { + Data string `json:"data,omitempty"` + Error error `json:"error,omitempty"` + } + + var result pgControldataResponse + err = json.Unmarshal(body, &result) + if err != nil { + result.Error = err + return nil, err + } + + return utils.ParsePgControldataOutput(result.Data), result.Error +} diff --git a/internal/backup/executor/doc.go b/internal/backup/executor/doc.go new file mode 100644 index 0000000..803021b --- /dev/null +++ b/internal/backup/executor/doc.go @@ -0,0 +1,2 @@ +// Package executor contains the logic of taking a backup +package executor diff --git a/internal/backup/executor/executor.go b/internal/backup/executor/executor.go new file mode 100644 index 0000000..e4bf2a6 --- /dev/null +++ b/internal/backup/executor/executor.go @@ -0,0 +1,275 @@ +package executor + +import ( + "context" + "fmt" + "io/fs" + "os" + "path" + "time" + + apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + "github.com/cloudnative-pg/cloudnative-pg/pkg/management/postgres/webserver" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging" +) + +var ( + errBackupNotStarted = fmt.Errorf("backup not started") + errBackupNotStopped = fmt.Errorf("backup not stopped") +) + +var backupModeBackoff = wait.Backoff{ + Steps: 10, + Duration: 1 * time.Second, + Factor: 5.0, + Jitter: 0.1, +} + +// Executor manages the execution of a backup +type Executor struct { + backupClient webserver.BackupClient + + beginWal string + endWal string + + cluster *apiv1.Cluster + backup *apiv1.Backup + repository *Repository + backupClientEndpoint string + + executed bool +} + +// GetBeginWal returns the beginWal value, panics if the executor was not executed +func (executor *Executor) GetBeginWal() string { + if !executor.executed { + panic("beginWal: please run take backup before trying to access this value") + } + return executor.beginWal +} + +// GetEndWal returns the endWal value, panics if the executor was not executed +func (executor *Executor) GetEndWal() string { + if !executor.executed { + panic("endWal: please run take backup before trying to access this value") + } + return executor.endWal +} + +// tablespace represent a tablespace location +type tablespace struct { + // path is the path where the tablespaces data is stored + path string + + // oid is the OID of the tablespace inside the database + oid string +} + +// newExecutor creates a new backup Executor +func newExecutor(cluster *apiv1.Cluster, backup *apiv1.Backup, repo *Repository, endpoint string) *Executor { + return &Executor{ + backupClient: webserver.NewBackupClient(), + cluster: cluster, + backup: backup, + repository: repo, + backupClientEndpoint: endpoint, + } +} + +// NewLocalExecutor creates a new backup Executor +func NewLocalExecutor(cluster *apiv1.Cluster, backup *apiv1.Backup, repo *Repository) *Executor { + return newExecutor(cluster, backup, repo, podIP) +} + +// TakeBackup executes a backup. Returns the result and any error encountered +func (executor *Executor) TakeBackup(ctx context.Context) (*webserver.BackupResultData, error) { + defer func() { + executor.executed = true + }() + + contextLogger := logging.FromContext(ctx) + contextLogger.Info("Preparing physical backup") + if err := executor.setBackupMode(ctx); err != nil { + return nil, err + } + + contextLogger.Info("Copying files") + if err := executor.execSnapshot(ctx); err != nil { + return nil, err + } + + contextLogger.Info("Finishing backup") + return executor.unsetBackupMode(ctx) +} + +// setBackupMode starts a backup by setting PostgreSQL in backup mode +func (executor *Executor) setBackupMode(ctx context.Context) error { + logger := logging.FromContext(ctx) + + var currentWALErr error + executor.beginWal, currentWALErr = executor.getCurrentWALFile(ctx) + if currentWALErr != nil { + return currentWALErr + } + + if err := executor.backupClient.Start(ctx, executor.backupClientEndpoint, webserver.StartBackupRequest{ + ImmediateCheckpoint: true, + WaitForArchive: true, + BackupName: executor.backup.GetName(), + Force: true, + }); err != nil { + logger.Error(err, "while requesting new backup on PostgreSQL") + return err + } + + logger.Info("Requesting PostgreSQL Backup mode") + if err := retry.OnError(backupModeBackoff, retryOnBackupNotStarted, func() error { + response, err := executor.backupClient.StatusWithErrors(ctx, executor.backupClientEndpoint) + if err != nil { + return err + } + + if response.Data.Phase != webserver.Started { + logger.V(4).Info("Backup still not started", "status", response.Data) + return errBackupNotStarted + } + + return nil + }); err != nil { + return err + } + + logger.Info("Backup Mode started") + return nil +} + +// execSnapshot takes the snapshot of the data directory and the tablespace folder +func (executor *Executor) execSnapshot(ctx context.Context) error { + const snapshotTablespaceOidName = "oid" + + const ( + snapshotTypeName = "type" + snapshotTypeBase = "base" + snapshotTypeTablespace = "tablespace" + ) + + logger := logging.FromContext(ctx) + + tablespaces, err := executor.getTablespaces(ctx) + if err != nil { + return err + } + + logger.Info("Taking snapshot of data directory") + err = executor.repository.takeSnapshot(ctx, pgDataLocation, map[string]string{ + snapshotTypeName: snapshotTypeBase, + }) + if err != nil { + return err + } + + for i := range tablespaces { + logger.Info("Taking snapshot of tablespace", "tablespace", tablespaces[i]) + err := executor.repository.takeSnapshot(ctx, tablespaces[i].path, map[string]string{ + snapshotTypeName: snapshotTypeTablespace, + snapshotTablespaceOidName: tablespaces[i].oid, + }) + if err != nil { + return err + } + } + + return nil +} + +// GetTablespaces read the list of tablespaces +func (*Executor) getTablespaces(ctx context.Context) ([]tablespace, error) { + logger := logging.FromContext(ctx) + + tblFolder := path.Join(pgDataLocation, tablespacesFolder) + entries, err := os.ReadDir(tblFolder) + if err != nil { + return nil, err + } + result := make([]tablespace, 0, len(entries)) + + for i := range entries { + fullPath, err := os.Readlink(path.Join(tblFolder, entries[i].Name())) + if err != nil { + logger.Error(err, "Error while reading tablespace link") + return nil, err + } + + if (entries[i].Type() & fs.ModeSymlink) != 0 { + result = append(result, tablespace{ + oid: entries[i].Name(), + path: fullPath, + }) + } + } + + return result, nil +} + +// unsetBackupMode stops a backup and resume PostgreSQL normal operation +func (executor *Executor) unsetBackupMode(ctx context.Context) (*webserver.BackupResultData, error) { + logger := logging.FromContext(ctx) + + if err := executor.backupClient.Stop(ctx, executor.backupClientEndpoint, webserver.StopBackupRequest{ + BackupName: executor.backup.GetName(), + }); err != nil { + logger.Error(err, "while requesting new backup on PostgreSQL") + return nil, err + } + + logger.Info("Stopping PostgreSQL Backup mode") + var backupStatus webserver.BackupResultData + if err := retry.OnError(backupModeBackoff, retryOnBackupNotStopped, func() error { + response, err := executor.backupClient.StatusWithErrors(ctx, executor.backupClientEndpoint) + if err != nil { + return err + } + + if response.Data.Phase != webserver.Completed { + logger.V(4).Info("backup still not stopped", "status", response.Data) + return errBackupNotStopped + } + + backupStatus = *response.Data + + return nil + }); err != nil { + return nil, err + } + logger.Info("PostgreSQL Backup mode stopped") + + var err error + executor.endWal, err = executor.getCurrentWALFile(ctx) + if err != nil { + return nil, err + } + + return &backupStatus, nil +} + +func retryOnBackupNotStarted(e error) bool { + return e == errBackupNotStarted +} + +func retryOnBackupNotStopped(e error) bool { + return e == errBackupNotStopped +} + +func (executor *Executor) getCurrentWALFile(ctx context.Context) (string, error) { + const currentWALFileControlFile = "Latest checkpoint's REDO WAL file" + + controlDataOutput, err := getPgControlData(ctx) + if err != nil { + return "", err + } + + return controlDataOutput[currentWALFileControlFile], nil +} diff --git a/internal/backup/executor/repository.go b/internal/backup/executor/repository.go new file mode 100644 index 0000000..0080e0f --- /dev/null +++ b/internal/backup/executor/repository.go @@ -0,0 +1,157 @@ +package executor + +import ( + "context" + "fmt" + "os/exec" + "path" + + "github.com/cloudnative-pg/plugin-pvc-backup/internal/fileutils" + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging" +) + +const ( + pgDataLocation = "/var/lib/postgresql/data/pgdata" + tablespacesFolder = "pg_tblspc" + walFolder = "pg_wal" +) + +// Repository represents a backup repository where +// base directories are stored +type Repository struct { + path string + cacheDirectory string + configFile string +} + +// NewRepository creates a new repository in a certain +// path, ensuring that the repository is initialized and +// ready to accept backups +func NewRepository(ctx context.Context, path string, configFile string, cacheDirectory string) (*Repository, error) { + result := &Repository{ + path: path, + configFile: configFile, + cacheDirectory: cacheDirectory, + } + + // We initialize the repository if it is not initialized + ok, err := fileutils.IsDir(path) + if err != nil { + return nil, err + } + + if !ok { + err = result.initializeRepository(ctx) + if err != nil { + return nil, err + } + } + + return result, nil +} + +func (repo *Repository) initializeRepository(ctx context.Context) error { + logger := logging.FromContext(ctx) + + args := []string{ + "kopia", + "Repository", + "create", + "filesystem", + fmt.Sprintf("--path=%s", repo.path), + fmt.Sprintf("--config-file=%s", repo.configFile), + fmt.Sprintf("--log-dir=%s/log", repo.cacheDirectory), + fmt.Sprintf("--cache-directory=%s", repo.cacheDirectory), + } + + cmd := exec.CommandContext(ctx, args[0], args[1:]...) // nolint:gosec + output, err := cmd.CombinedOutput() + if err != nil { + logger.Error( + err, + "Error invoking kopia create filesystem command", + "args", args, + "output", string(output)) + return err + } + + return repo.configureIgnoreFolders(ctx) +} + +func (repo *Repository) configureIgnoreFolders(ctx context.Context) error { + if err := repo.addIgnoreFolder(ctx, path.Join(pgDataLocation, walFolder)); err != nil { + return err + } + + if err := repo.addIgnoreFolder(ctx, path.Join(pgDataLocation, tablespacesFolder)); err != nil { + return err + } + + return nil +} + +func (repo *Repository) addIgnoreFolder(ctx context.Context, folder string) error { + logger := logging.FromContext(ctx) + + args := []string{ + "kopia", + "policy", + "set", + folder, + fmt.Sprintf("--log-dir=%s/log", repo.cacheDirectory), + "--add-ignore=.", + fmt.Sprintf("--config-file=%s", repo.configFile), + } + + cmd := exec.CommandContext(ctx, args[0], args[1:]...) // nolint:gosec + output, err := cmd.CombinedOutput() + if err != nil { + logger.Error( + err, + "Error invoking kopia policy set command", + "args", args, + "output", string(output)) + return err + } + + return nil +} + +// takeSnapshot takes a Kopia snapshot of a certain path, adding a set of tags +func (repo *Repository) takeSnapshot(ctx context.Context, path string, tags map[string]string) error { + logger := logging.FromContext(ctx) + + args := []string{ + "kopia", + "snapshot", + "create", + fmt.Sprintf("--log-dir=%s/log", repo.cacheDirectory), + fmt.Sprintf("--config-file=%s", repo.configFile), + path, + } + + tagsOption := "" + for k, v := range tags { + if len(tagsOption) > 0 { + tagsOption += "," + } + tagsOption += fmt.Sprintf("%s:%v", k, v) + } + + if len(tagsOption) > 0 { + args = append(args, "--tags="+tagsOption) + } + + cmd := exec.CommandContext(ctx, args[0], args[1:]...) // nolint:gosec + output, err := cmd.CombinedOutput() + if err != nil { + logger.Error( + err, + "Error invoking kopia snapshot create command", + "args", args, + "output", string(output)) + return err + } + + return nil +} diff --git a/internal/backup/storage/doc.go b/internal/backup/storage/doc.go new file mode 100644 index 0000000..af3fe7f --- /dev/null +++ b/internal/backup/storage/doc.go @@ -0,0 +1,3 @@ +// Package storage exposes a set of useful function +// to control the /backups folder +package storage diff --git a/internal/backup/storage/storage.go b/internal/backup/storage/storage.go new file mode 100644 index 0000000..6ed666d --- /dev/null +++ b/internal/backup/storage/storage.go @@ -0,0 +1,65 @@ +package storage + +import "path" + +const ( + basePath = "/backup" + walsDirectory = "wals" + baseDirectory = "base" +) + +func getWalPrefix(walName string) string { + return walName[0:16] +} + +// getClusterPath gets the path where the files relative +// to a cluster are stored +func getClusterPath(clusterName string) string { + return path.Join(basePath, clusterName) +} + +// GetWALPath gets the path where the WALs relative +// to a cluster are stored +func GetWALPath(clusterName string) string { + return path.Join( + getClusterPath(clusterName), + walsDirectory, + ) +} + +// GetKopiaConfigFilePath gets the path where the +// kopia configuration file will be written +func GetKopiaConfigFilePath(clusterName string) string { + return path.Join( + getClusterPath(clusterName), + ".kopia.config", + ) +} + +// GetKopiaCacheDirectory gets the path where the +// kopia cache will be written +func GetKopiaCacheDirectory(clusterName string) string { + return path.Join( + getClusterPath(clusterName), + ".kopia.cache", + ) +} + +// GetBasePath gets the path where the WALs relative +// to a cluster are stored +func GetBasePath(clusterName string) string { + return path.Join( + getClusterPath(clusterName), + baseDirectory, + ) +} + +// GetWALFilePath gets the path where a certain WAL file +// should be stored +func GetWALFilePath(clusterName string, walName string) string { + return path.Join( + GetWALPath(clusterName), + getWalPrefix(walName), + walName, + ) +} diff --git a/internal/fileutils/checks.go b/internal/fileutils/checks.go new file mode 100644 index 0000000..f555ae2 --- /dev/null +++ b/internal/fileutils/checks.go @@ -0,0 +1,21 @@ +package fileutils + +import ( + "os" +) + +// IsDir checks if a path points to an existing directory +func IsDir(path string) (bool, error) { + fileInfo, err := os.Stat(path) + if os.IsNotExist(err) { + return false, nil + } + if err != nil { + return false, err + } + + if fileInfo.Mode().IsDir() { + return true, nil + } + return false, nil +} diff --git a/internal/identity/impl.go b/internal/identity/impl.go index 3c1b4da..440b1c6 100644 --- a/internal/identity/impl.go +++ b/internal/identity/impl.go @@ -58,6 +58,13 @@ func (Implementation) GetPluginCapabilities( }, }, }, + { + Type: &identity.PluginCapability_Service_{ + Service: &identity.PluginCapability_Service{ + Type: identity.PluginCapability_Service_TYPE_BACKUP_SERVICE, + }, + }, + }, }, }, nil } diff --git a/internal/operator/mutations.go b/internal/operator/mutations.go index 532f54c..125f340 100644 --- a/internal/operator/mutations.go +++ b/internal/operator/mutations.go @@ -32,7 +32,7 @@ func (Implementation) MutateCluster( _ context.Context, request *operator.OperatorMutateClusterRequest, ) (*operator.OperatorMutateClusterResult, error) { - helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.Definition) + helper, err := pluginhelper.NewDataBuilder(metadata.Data.Name, request.Definition).Build() if err != nil { return nil, err } @@ -67,7 +67,8 @@ func (Implementation) MutatePod( _ context.Context, request *operator.OperatorMutatePodRequest, ) (*operator.OperatorMutatePodResult, error) { - helper, err := pluginhelper.NewFromClusterAndPod(metadata.Data.Name, request.ClusterDefinition, request.PodDefinition) + helper, err := pluginhelper.NewDataBuilder(metadata.Data.Name, request.ClusterDefinition). + WithPod(request.PodDefinition).Build() if err != nil { return nil, err } @@ -79,7 +80,7 @@ func (Implementation) MutatePod( if len(mutatedPod.Spec.Containers) > 0 { mutatedPod.Spec.Containers = append( mutatedPod.Spec.Containers, - getSidecarContainer(helper.Parameters)) + getSidecarContainer(mutatedPod, helper.Parameters)) } // Inject backup volume diff --git a/internal/operator/spec.go b/internal/operator/spec.go index 66a2e19..62da174 100644 --- a/internal/operator/spec.go +++ b/internal/operator/spec.go @@ -16,10 +16,16 @@ limitations under the License. package operator -import corev1 "k8s.io/api/core/v1" +import ( + "strings" -func getSidecarContainer(parameters map[string]string) corev1.Container { - return corev1.Container{ + corev1 "k8s.io/api/core/v1" +) + +const pgPath = "/var/lib/postgresql" + +func getSidecarContainer(pgPod *corev1.Pod, parameters map[string]string) corev1.Container { + result := corev1.Container{ Name: "plugin-pvc-backup", VolumeMounts: []corev1.VolumeMount{ { @@ -34,14 +40,32 @@ func getSidecarContainer(parameters map[string]string) corev1.Container { Name: "backups", MountPath: "/backup", }, - { - Name: "pgdata", - MountPath: "/var/lib/postgresql/data", - }, }, Image: parameters["image"], ImagePullPolicy: corev1.PullPolicy(parameters[imagePullPolicyParameter]), + Env: []corev1.EnvVar{ + { + Name: "KOPIA_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: parameters["secretName"], + }, + Key: parameters["secretKey"], + }, + }, + }, + }, } + + volumeMounts := pgPod.Spec.Containers[0].VolumeMounts + for i := range volumeMounts { + if strings.HasPrefix(volumeMounts[i].MountPath, pgPath) { + result.VolumeMounts = append(result.VolumeMounts, volumeMounts[i]) + } + } + + return result } func getBackupVolume(parameters map[string]string) corev1.Volume { diff --git a/internal/operator/validation.go b/internal/operator/validation.go index dd49243..aa1b64e 100644 --- a/internal/operator/validation.go +++ b/internal/operator/validation.go @@ -30,6 +30,8 @@ const ( imagePullPolicyParameter = "imagePullPolicy" imageNameParameter = "image" pvcNameParameter = "pvc" + secretNameParameter = "secretName" + secretKeyParameter = "secretKey" ) // ValidateClusterCreate validates a cluster that is being created @@ -39,7 +41,7 @@ func (Implementation) ValidateClusterCreate( ) (*operator.OperatorValidateClusterCreateResult, error) { result := &operator.OperatorValidateClusterCreateResult{} - helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.Definition) + helper, err := pluginhelper.NewDataBuilder(metadata.Data.Name, request.Definition).Build() if err != nil { return nil, err } @@ -56,12 +58,12 @@ func (Implementation) ValidateClusterChange( ) (*operator.OperatorValidateClusterChangeResult, error) { result := &operator.OperatorValidateClusterChangeResult{} - oldClusterHelper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.OldCluster) + oldClusterHelper, err := pluginhelper.NewDataBuilder(metadata.Data.Name, request.OldCluster).Build() if err != nil { return nil, fmt.Errorf("while parsing old cluster: %w", err) } - newClusterHelper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.NewCluster) + newClusterHelper, err := pluginhelper.NewDataBuilder(metadata.Data.Name, request.NewCluster).Build() if err != nil { return nil, fmt.Errorf("while parsing new cluster: %w", err) } @@ -92,5 +94,17 @@ func validateParameters(helper *pluginhelper.Data) []*operator.ValidationError { helper.ValidationErrorForParameter(imageNameParameter, "cannot be empty")) } + if len(helper.Parameters[secretNameParameter]) == 0 { + result = append( + result, + helper.ValidationErrorForParameter(secretNameParameter, "cannot be empty")) + } + + if len(helper.Parameters[secretKeyParameter]) == 0 { + result = append( + result, + helper.ValidationErrorForParameter(secretKeyParameter, "cannot be empty")) + } + return result } diff --git a/internal/wal/status.go b/internal/wal/status.go index f23be26..cdf62be 100644 --- a/internal/wal/status.go +++ b/internal/wal/status.go @@ -25,6 +25,7 @@ import ( "github.com/cloudnative-pg/cnpg-i/pkg/wal" + "github.com/cloudnative-pg/plugin-pvc-backup/internal/backup/storage" "github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging" "github.com/cloudnative-pg/plugin-pvc-backup/pkg/metadata" "github.com/cloudnative-pg/plugin-pvc-backup/pkg/pluginhelper" @@ -42,35 +43,35 @@ func (Implementation) Status( ctx context.Context, request *wal.WALStatusRequest, ) (*wal.WALStatusResult, error) { - logging := logging.FromContext(ctx) + contextLogger := logging.FromContext(ctx) - helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition) + helper, err := pluginhelper.NewDataBuilder(metadata.Data.Name, request.ClusterDefinition).Build() if err != nil { - logging.Error(err, "Error while decoding cluster definition from CNPG") + contextLogger.Error(err, "Error while decoding cluster definition from CNPG") return nil, err } - walPath := getWALPath(helper.GetCluster().Name) - logging = logging.WithValues( + walPath := storage.GetWALPath(helper.GetCluster().Name) + contextLogger = contextLogger.WithValues( "walPath", walPath, "clusterName", helper.GetCluster().Name, ) walDirEntries, err := os.ReadDir(walPath) if err != nil { - logging.Error(err, "Error while reading WALs directory") + contextLogger.Error(err, "Error while reading WALs directory") return nil, err } firstWal, err := getWALStat(helper.GetCluster().Name, walDirEntries, walStatModeFirst) if err != nil { - logging.Error(err, "Error while reading WALs directory (getting first WAL)") + contextLogger.Error(err, "Error while reading WALs directory (getting first WAL)") return nil, err } lastWal, err := getWALStat(helper.GetCluster().Name, walDirEntries, walStatModeLast) if err != nil { - logging.Error(err, "Error while reading WALs directory (getting first WAL)") + contextLogger.Error(err, "Error while reading WALs directory (getting first WAL)") return nil, err } @@ -90,7 +91,7 @@ func getWALStat(clusterName string, entries []fs.DirEntry, mode walStatMode) (st return "", fmt.Errorf("%s is not a directory", entry) } - entryAbsolutePath := path.Join(getWALPath(clusterName), entry.Name()) + entryAbsolutePath := path.Join(storage.GetWALPath(clusterName), entry.Name()) subFolderEntries, err := os.ReadDir(entryAbsolutePath) if err != nil { return "", fmt.Errorf("while reading %s entries: %w", entry, err) diff --git a/internal/wal/utils.go b/internal/wal/utils.go deleted file mode 100644 index eb80b0b..0000000 --- a/internal/wal/utils.go +++ /dev/null @@ -1,42 +0,0 @@ -/* -Copyright The CloudNativePG Contributors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package wal - -import "path" - -func getWalPrefix(walName string) string { - return walName[0:16] -} - -func getClusterPath(clusterName string) string { - return path.Join(basePath, clusterName) -} - -func getWALPath(clusterName string) string { - return path.Join( - getClusterPath(clusterName), - walsDirectory, - ) -} - -func getWALFilePath(clusterName string, walName string) string { - return path.Join( - getWALPath(clusterName), - getWalPrefix(walName), - walName, - ) -} diff --git a/internal/wal/wal.go b/internal/wal/wal.go index b2ef9f5..da3fefa 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -22,43 +22,39 @@ import ( "github.com/cloudnative-pg/cnpg-i/pkg/wal" + "github.com/cloudnative-pg/plugin-pvc-backup/internal/backup/storage" "github.com/cloudnative-pg/plugin-pvc-backup/internal/fileutils" "github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging" "github.com/cloudnative-pg/plugin-pvc-backup/pkg/metadata" "github.com/cloudnative-pg/plugin-pvc-backup/pkg/pluginhelper" ) -const ( - basePath = "/backup" - walsDirectory = "wals" -) - // Archive copies one WAL file into the archive func (Implementation) Archive( ctx context.Context, request *wal.WALArchiveRequest, ) (*wal.WALArchiveResult, error) { - logging := logging.FromContext(ctx) + contextLogger := logging.FromContext(ctx) - helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition) + helper, err := pluginhelper.NewDataBuilder(metadata.Data.Name, request.ClusterDefinition).Build() if err != nil { - logging.Error(err, "Error while decoding cluster definition from CNPG") + contextLogger.Error(err, "Error while decoding cluster definition from CNPG") return nil, err } walName := path.Base(request.SourceFileName) - destinationPath := getWALFilePath(helper.GetCluster().Name, walName) + destinationPath := storage.GetWALFilePath(helper.GetCluster().Name, walName) - logging = logging.WithValues( + contextLogger = contextLogger.WithValues( "sourceFileName", request.SourceFileName, "destinationPath", destinationPath, "clusterName", helper.GetCluster().Name, ) - logging.Info("Archiving WAL File") + contextLogger.Info("Archiving WAL File") err = fileutils.CopyFile(request.SourceFileName, destinationPath) if err != nil { - logging.Error(err, "Error archiving WAL file") + contextLogger.Error(err, "Error archiving WAL file") } return &wal.WALArchiveResult{}, err @@ -69,26 +65,26 @@ func (Implementation) Restore( ctx context.Context, request *wal.WALRestoreRequest, ) (*wal.WALRestoreResult, error) { - logging := logging.FromContext(ctx) + contextLogger := logging.FromContext(ctx) - helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition) + helper, err := pluginhelper.NewDataBuilder(metadata.Data.Name, request.ClusterDefinition).Build() if err != nil { - logging.Error(err, "Error while decoding cluster definition from CNPG") + contextLogger.Error(err, "Error while decoding cluster definition from CNPG") return nil, err } - walFilePath := getWALFilePath(helper.GetCluster().Name, request.SourceWalName) - logging = logging.WithValues( + walFilePath := storage.GetWALFilePath(helper.GetCluster().Name, request.SourceWalName) + contextLogger = contextLogger.WithValues( "clusterName", helper.GetCluster().Name, "walName", request.SourceWalName, "walFilePath", walFilePath, "destinationPath", request.DestinationFileName, ) - logging.Info("Restoring WAL File") + contextLogger.Info("Restoring WAL File") err = fileutils.CopyFile(walFilePath, request.DestinationFileName) if err != nil { - logging.Info("Restored WAL File", "err", err) + contextLogger.Info("Restored WAL File", "err", err) } return &wal.WALRestoreResult{}, err diff --git a/kubernetes/backup-example.yaml b/kubernetes/backup-example.yaml new file mode 100644 index 0000000..a9096de --- /dev/null +++ b/kubernetes/backup-example.yaml @@ -0,0 +1,11 @@ +apiVersion: postgresql.cnpg.io/v1 +kind: Backup +metadata: + name: backup-example +spec: + cluster: + name: cluster-example + + method: plugin + pluginConfiguration: + name: pvc-backup.cloudnative-pg.io diff --git a/kubernetes/cluster-example.yaml b/kubernetes/cluster-example.yaml index 88992c8..d52e516 100644 --- a/kubernetes/cluster-example.yaml +++ b/kubernetes/cluster-example.yaml @@ -3,7 +3,7 @@ kind: Cluster metadata: name: cluster-example spec: - instances: 3 + instances: 1 plugins: - name: pvc-backup.cloudnative-pg.io @@ -11,6 +11,26 @@ spec: pvc: backups-pvc image: plugin-pvc-backup:latest imagePullPolicy: Never + secretName: kopia-password + secretKey: password + + tablespaces: + - name: atablespace + storage: + size: 1Gi + storageClass: standard + - name: another_tablespace + storage: + size: 2Gi + storageClass: standard storage: size: 1Gi +--- +apiVersion: v1 +data: + password: dGhpc2lzYXJhbmRvbXBhc3N3b3Jk +kind: Secret +metadata: + creationTimestamp: null + name: kopia-password \ No newline at end of file diff --git a/main.go b/main.go index 627c6f8..8ac3e5e 100644 --- a/main.go +++ b/main.go @@ -21,10 +21,12 @@ import ( "fmt" "os" + "github.com/cloudnative-pg/cnpg-i/pkg/backup" "github.com/cloudnative-pg/cnpg-i/pkg/operator" "github.com/cloudnative-pg/cnpg-i/pkg/wal" "google.golang.org/grpc" + backupImpl "github.com/cloudnative-pg/plugin-pvc-backup/internal/backup" "github.com/cloudnative-pg/plugin-pvc-backup/internal/identity" operatorImpl "github.com/cloudnative-pg/plugin-pvc-backup/internal/operator" walImpl "github.com/cloudnative-pg/plugin-pvc-backup/internal/wal" @@ -35,6 +37,7 @@ func main() { cmd := pluginhelper.CreateMainCmd(identity.Implementation{}, func(server *grpc.Server) { operator.RegisterOperatorServer(server, operatorImpl.Implementation{}) wal.RegisterWALServer(server, walImpl.Implementation{}) + backup.RegisterBackupServer(server, backupImpl.Implementation{}) }) err := cmd.Execute() if err != nil { diff --git a/pkg/pluginhelper/helper.go b/pkg/pluginhelper/helper.go index 8f7615e..eaa3287 100644 --- a/pkg/pluginhelper/helper.go +++ b/pkg/pluginhelper/helper.go @@ -42,21 +42,43 @@ type Data struct { pluginIndex int } -// NewFromCluster creates a new validation helper loading -// a cluster definition -func NewFromCluster( - pluginName string, - clusterDefinition []byte, -) (*Data, error) { +// DataBuilder a fluent constructor for the Data struct +type DataBuilder struct { + pluginName string + clusterJSON []byte + podJSON []byte +} + +// NewDataBuilder initializes a basic DataBuilder +func NewDataBuilder(pluginName string, clusterJSON []byte) *DataBuilder { + d := DataBuilder{clusterJSON: clusterJSON, pluginName: pluginName} + d.clusterJSON = clusterJSON + return &d +} + +// WithPod adds Pod data to the DataBuilder +func (d *DataBuilder) WithPod(podJSON []byte) *DataBuilder { + d.podJSON = podJSON + return d +} + +// Build returns the constructed Data object and any errors encountered +func (d *DataBuilder) Build() (*Data, error) { result := &Data{} - if err := json.Unmarshal(clusterDefinition, &result.cluster); err != nil { + if err := json.Unmarshal(d.clusterJSON, &result.cluster); err != nil { return nil, err } + if len(d.podJSON) > 0 { + if err := json.Unmarshal(d.podJSON, &result.pod); err != nil { + return nil, err + } + } + result.pluginIndex = -1 for idx, cfg := range result.cluster.Spec.Plugins { - if cfg.Name == pluginName { + if cfg.Name == d.pluginName { result.pluginIndex = idx result.Parameters = cfg.Parameters } @@ -65,25 +87,6 @@ func NewFromCluster( return result, nil } -// NewFromClusterAndPod creates a new validation helper loading -// a cluster and a Pod definition -func NewFromClusterAndPod( - pluginName string, - clusterDefinition []byte, - podDefinition []byte, -) (*Data, error) { - result, err := NewFromCluster(pluginName, clusterDefinition) - if err != nil { - return nil, err - } - - if err := json.Unmarshal(podDefinition, &result.pod); err != nil { - return nil, err - } - - return result, nil -} - // GetCluster gets the decoded cluster object func (helper *Data) GetCluster() *apiv1.Cluster { return &helper.cluster @@ -184,3 +187,14 @@ func (*Data) InjectPluginVolume(pod *corev1.Pod) { } } } + +// DecodeBackup decodes a JSON representation of a backup +func (*Data) DecodeBackup(backupDefinition []byte) (*apiv1.Backup, error) { + var backup apiv1.Backup + + if err := json.Unmarshal(backupDefinition, &backup); err != nil { + return nil, err + } + + return &backup, nil +} diff --git a/pkg/pluginhelper/server.go b/pkg/pluginhelper/server.go index 5df5b73..087416e 100644 --- a/pkg/pluginhelper/server.go +++ b/pkg/pluginhelper/server.go @@ -45,7 +45,7 @@ type ServerEnricher func(*grpc.Server) func CreateMainCmd(identityImpl identity.IdentityServer, enrichers ...ServerEnricher) *cobra.Command { cmd := &cobra.Command{ Use: "pvc-backup", - PersistentPreRun: func(cmd *cobra.Command, args []string) { + PersistentPreRun: func(cmd *cobra.Command, _ []string) { ctx := logging.IntoContext( cmd.Context(), viper.GetBool("debug"))