From 33c27eef22bdd6c17b3fea541144570f6d0ba794 Mon Sep 17 00:00:00 2001 From: Leonardo Cecchi Date: Thu, 25 Jan 2024 08:37:08 +0000 Subject: [PATCH] feat: initial implementation of the WAL service Signed-off-by: Leonardo Cecchi --- .dockerignore | 4 ++ Dockerfile | 2 +- go.mod | 2 +- go.sum | 2 + internal/fileutils/cp.go | 133 ++++++++++++++++++++++++++++++++++++++ internal/fileutils/doc.go | 2 + internal/wal/doc.go | 3 + internal/wal/impl.go | 51 +++++++++++++++ internal/wal/status.go | 106 ++++++++++++++++++++++++++++++ internal/wal/utils.go | 26 ++++++++ internal/wal/wal.go | 79 ++++++++++++++++++++++ main.go | 3 + 12 files changed, 411 insertions(+), 2 deletions(-) create mode 100644 .dockerignore create mode 100644 internal/fileutils/cp.go create mode 100644 internal/fileutils/doc.go create mode 100644 internal/wal/doc.go create mode 100644 internal/wal/impl.go create mode 100644 internal/wal/status.go create mode 100644 internal/wal/utils.go create mode 100644 internal/wal/wal.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..2e718ce --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +cloudnative-pg +bin +.github +.git \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index dfc9280..d9a0fbc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,7 @@ RUN go mod download # Compile the application COPY . /app -RUN ./scripts/build.sh +RUN --mount=type=cache,target=/root/.cache/go-build ./scripts/build.sh # Step 2: build the image to be actually run FROM alpine:3.18.4 diff --git a/go.mod b/go.mod index d264341..5c3d6ea 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.21.6 require ( github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8 - github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f + github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426 github.com/evanphx/json-patch/v5 v5.8.1 github.com/go-logr/logr v1.3.0 github.com/go-logr/zapr v1.2.4 diff --git a/go.sum b/go.sum index c85cfd6..256fbb3 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8 h github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8/go.mod h1:r6blheO2ihiuqKbk6rqPN5//PPJnYtKCGT2OxpXtk2o= github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f h1:ypwPq45y8ezzwxUTHL0VkzkT2+pcHnE4yRoeGTP8fp8= github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f/go.mod h1:0G5GXQVj09KvONIcYURyroL74zOFGjv4eI5OXz7/G/0= +github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426 h1:eW94u+AQoFR+KDyIenekcHWCE6Kc48mo8CgGB+VOzKU= +github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426/go.mod h1:0G5GXQVj09KvONIcYURyroL74zOFGjv4eI5OXz7/G/0= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= diff --git a/internal/fileutils/cp.go b/internal/fileutils/cp.go new file mode 100644 index 0000000..7937530 --- /dev/null +++ b/internal/fileutils/cp.go @@ -0,0 +1,133 @@ +package fileutils + +import ( + "bytes" + "fmt" + "io" + "os" + "os/user" + "path/filepath" + "strings" +) + +// This implementation is based on https://github.com/nmrshll/go-cp/blob/master/cp.go + +func replaceHomeFolder(path string) (string, error) { + if !strings.HasPrefix(path, "~") { + return path, nil + } + var buffer bytes.Buffer + usr, err := user.Current() + if err != nil { + return "", err + } + _, err = buffer.WriteString(usr.HomeDir) + if err != nil { + return "", err + } + _, err = buffer.WriteString(strings.TrimPrefix(path, "~")) + if err != nil { + return "", err + } + + return buffer.String(), nil +} + +// AbsolutePath converts a path (relative or absolute) into an absolute one. +// Supports '~' notation for $HOME directory of the current user. +func AbsolutePath(path string) (string, error) { + homeReplaced, err := replaceHomeFolder(path) + if err != nil { + return "", err + } + return filepath.Abs(homeReplaced) +} + +// CopyFile copies a file from src to dst. If src and dst files exist, and are +// the same, then return success. Otherwise, attempt to create a hard link +// between the two files. If that fails, copy the file contents from src to dst. +// Creates any missing directories. Supports '~' notation for $HOME directory of the current user. +func CopyFile(src, dst string) error { + srcAbs, err := AbsolutePath(src) + if err != nil { + return err + } + dstAbs, err := AbsolutePath(dst) + if err != nil { + return err + } + + // open source file + sfi, err := os.Stat(srcAbs) + if err != nil { + return err + } + if !sfi.Mode().IsRegular() { + // cannot copy non-regular files (e.g., directories, + // symlinks, devices, etc.) + return fmt.Errorf("CopyFile: non-regular source file %s (%q)", sfi.Name(), sfi.Mode().String()) + } + + // open dest file + dfi, err := os.Stat(dstAbs) + if err != nil && !os.IsNotExist(err) { + return err + } + + if err != nil { + // file doesn't exist + err := os.MkdirAll(filepath.Dir(dst), 0o750) + if err != nil { + return err + } + } else { + if !(dfi.Mode().IsRegular()) { + return fmt.Errorf("CopyFile: non-regular destination file %s (%q)", dfi.Name(), dfi.Mode().String()) + } + if os.SameFile(sfi, dfi) { + return err + } + } + if err = os.Link(src, dst); err == nil { + return err + } + return copyFileContents(src, dst) +} + +// copyFileContents copies the contents of the file named src to the file named +// by dst. The file will be created if it does not already exist. If the +// destination file exists, all it's contents will be replaced by the contents +// of the source file. +func copyFileContents(src, dst string) error { + // Open the source file for reading + srcFile, err := os.Open(src) // nolint:gosec + if err != nil { + return err + } + defer func() { + _ = srcFile.Close() + }() + + // Open the destination file for writing + dstFile, err := os.Create(dst) // nolint:gosec + if err != nil { + return err + } + // Return any errors that result from closing the destination file + // Will return nil if no errors occurred + defer func() { + cerr := dstFile.Close() + if err == nil { + err = cerr + } + }() + + // Copy the contents of the source file into the destination files + size := 1024 * 1024 + buf := make([]byte, size) + if _, err = io.CopyBuffer(dstFile, srcFile, buf); err != nil { + return err + } + err = dstFile.Sync() + return err +} diff --git a/internal/fileutils/doc.go b/internal/fileutils/doc.go new file mode 100644 index 0000000..c1f65dc --- /dev/null +++ b/internal/fileutils/doc.go @@ -0,0 +1,2 @@ +// Package fileutils contains a set of useful functions to manage files +package fileutils diff --git a/internal/wal/doc.go b/internal/wal/doc.go new file mode 100644 index 0000000..b59d6c2 --- /dev/null +++ b/internal/wal/doc.go @@ -0,0 +1,3 @@ +// Package wal contains the implementation of the +// WAL Manager server +package wal diff --git a/internal/wal/impl.go b/internal/wal/impl.go new file mode 100644 index 0000000..3de78b9 --- /dev/null +++ b/internal/wal/impl.go @@ -0,0 +1,51 @@ +package wal + +import ( + "context" + + "github.com/cloudnative-pg/cnpg-i/pkg/wal" +) + +// Implementation is the implementation of the identity service +type Implementation struct { + wal.WALServer +} + +// GetCapabilities gets the capabilities of the WAL service +func (Implementation) GetCapabilities( + context.Context, + *wal.WALCapabilitiesRequest, +) (*wal.WALCapabilitiesResult, error) { + return &wal.WALCapabilitiesResult{ + Capabilities: []*wal.WALCapability{ + { + Type: &wal.WALCapability_Rpc{ + Rpc: &wal.WALCapability_RPC{ + Type: wal.WALCapability_RPC_TYPE_ARCHIVE_WAL, + }, + }, + }, + { + Type: &wal.WALCapability_Rpc{ + Rpc: &wal.WALCapability_RPC{ + Type: wal.WALCapability_RPC_TYPE_RESTORE_WAL, + }, + }, + }, + { + Type: &wal.WALCapability_Rpc{ + Rpc: &wal.WALCapability_RPC{ + Type: wal.WALCapability_RPC_TYPE_STATUS, + }, + }, + }, + { + Type: &wal.WALCapability_Rpc{ + Rpc: &wal.WALCapability_RPC{ + Type: wal.WALCapability_RPC_TYPE_SET_FIRST_REQUIRED, + }, + }, + }, + }, + }, nil +} diff --git a/internal/wal/status.go b/internal/wal/status.go new file mode 100644 index 0000000..4b92eb1 --- /dev/null +++ b/internal/wal/status.go @@ -0,0 +1,106 @@ +package wal + +import ( + "context" + "fmt" + "io/fs" + "os" + "path" + + "github.com/cloudnative-pg/cnpg-i/pkg/wal" + + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging" + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/metadata" + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/pluginhelper" +) + +type walStatMode string + +const ( + walStatModeFirst = "first" + walStatModeLast = "last" +) + +// Status gets the statistics of the WAL file archive +func (Implementation) Status( + ctx context.Context, + request *wal.WALStatusRequest, +) (*wal.WALStatusResult, error) { + logging := logging.FromContext(ctx) + + helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition) + if err != nil { + logging.Error(err, "Error while decoding cluster definition from CNPG") + return nil, err + } + + walPath := getWALPath(helper.GetCluster().Name) + logging = logging.WithValues( + "walPath", walPath, + "clusterName", helper.GetCluster().Name, + ) + + walDirEntries, err := os.ReadDir(walPath) + if err != nil { + logging.Error(err, "Error while reading WALs directory") + return nil, err + } + + firstWal, err := getWALStat(helper.GetCluster().Name, walDirEntries, walStatModeFirst) + if err != nil { + logging.Error(err, "Error while reading WALs directory (getting first WAL)") + return nil, err + } + + lastWal, err := getWALStat(helper.GetCluster().Name, walDirEntries, walStatModeLast) + if err != nil { + logging.Error(err, "Error while reading WALs directory (getting first WAL)") + return nil, err + } + + return &wal.WALStatusResult{ + FirstWal: firstWal, + LastWal: lastWal, + }, nil +} + +func getWALStat(clusterName string, entries []fs.DirEntry, mode walStatMode) (string, error) { + entry, ok := getEntry(entries, mode) + if !ok { + return "", nil + } + + if !entry.IsDir() { + return "", fmt.Errorf("%s is not a directory", entry) + } + + entryAbsolutePath := path.Join(getWALPath(clusterName), entry.Name()) + subFolderEntries, err := os.ReadDir(entryAbsolutePath) + if err != nil { + return "", fmt.Errorf("while reading %s entries: %w", entry, err) + } + + selectSubFolderEntry, ok := getEntry(subFolderEntries, mode) + if !ok { + return "", nil + } + + return selectSubFolderEntry.Name(), nil +} + +func getEntry(entries []fs.DirEntry, mode walStatMode) (fs.DirEntry, bool) { + if len(entries) == 0 { + return nil, false + } + + switch mode { + case walStatModeFirst: + return entries[0], true + + case walStatModeLast: + return entries[len(entries)-1], true + + default: + return nil, false + } +} diff --git a/internal/wal/utils.go b/internal/wal/utils.go new file mode 100644 index 0000000..19dbe97 --- /dev/null +++ b/internal/wal/utils.go @@ -0,0 +1,26 @@ +package wal + +import "path" + +func getWalPrefix(walName string) string { + return walName[0:16] +} + +func getClusterPath(clusterName string) string { + return path.Join(basePath, clusterName) +} + +func getWALPath(clusterName string) string { + return path.Join( + getClusterPath(clusterName), + walsDirectory, + ) +} + +func getWALFilePath(clusterName string, walName string) string { + return path.Join( + getWALPath(clusterName), + getWalPrefix(walName), + walName, + ) +} diff --git a/internal/wal/wal.go b/internal/wal/wal.go new file mode 100644 index 0000000..fe1155d --- /dev/null +++ b/internal/wal/wal.go @@ -0,0 +1,79 @@ +package wal + +import ( + "context" + "path" + + "github.com/cloudnative-pg/cnpg-i/pkg/wal" + + "github.com/cloudnative-pg/plugin-pvc-backup/internal/fileutils" + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging" + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/metadata" + "github.com/cloudnative-pg/plugin-pvc-backup/pkg/pluginhelper" +) + +const ( + basePath = "/backup" + walsDirectory = "wals" +) + +// Archive copies one WAL file into the archive +func (Implementation) Archive( + ctx context.Context, + request *wal.WALArchiveRequest, +) (*wal.WALArchiveResult, error) { + logging := logging.FromContext(ctx) + + helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition) + if err != nil { + logging.Error(err, "Error while decoding cluster definition from CNPG") + return nil, err + } + + walName := path.Base(request.SourceFileName) + destinationPath := getWALFilePath(helper.GetCluster().Name, walName) + + logging = logging.WithValues( + "sourceFileName", request.SourceFileName, + "destinationPath", destinationPath, + "clusterName", helper.GetCluster().Name, + ) + + logging.Info("Archiving WAL File") + err = fileutils.CopyFile(request.SourceFileName, destinationPath) + if err != nil { + logging.Error(err, "Error archiving WAL file") + } + + return &wal.WALArchiveResult{}, err +} + +// Restore copies WAL file from the archive to the data directory +func (Implementation) Restore( + ctx context.Context, + request *wal.WALRestoreRequest, +) (*wal.WALRestoreResult, error) { + logging := logging.FromContext(ctx) + + helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition) + if err != nil { + logging.Error(err, "Error while decoding cluster definition from CNPG") + return nil, err + } + + walFilePath := getWALFilePath(helper.GetCluster().Name, request.SourceWalName) + logging = logging.WithValues( + "clusterName", helper.GetCluster().Name, + "walName", request.SourceWalName, + "walFilePath", walFilePath, + "destinationPath", request.DestinationFileName, + ) + + logging.Info("Restoring WAL File") + err = fileutils.CopyFile(walFilePath, request.DestinationFileName) + if err != nil { + logging.Info("Restored WAL File", "err", err) + } + + return &wal.WALRestoreResult{}, err +} diff --git a/main.go b/main.go index 84fe7e5..93e372f 100644 --- a/main.go +++ b/main.go @@ -6,16 +6,19 @@ import ( "os" "github.com/cloudnative-pg/cnpg-i/pkg/operator" + "github.com/cloudnative-pg/cnpg-i/pkg/wal" "google.golang.org/grpc" "github.com/cloudnative-pg/plugin-pvc-backup/internal/identity" operatorImpl "github.com/cloudnative-pg/plugin-pvc-backup/internal/operator" + walImpl "github.com/cloudnative-pg/plugin-pvc-backup/internal/wal" "github.com/cloudnative-pg/plugin-pvc-backup/pkg/pluginhelper" ) func main() { cmd := pluginhelper.CreateMainCmd(identity.Implementation{}, func(server *grpc.Server) { operator.RegisterOperatorServer(server, operatorImpl.Implementation{}) + wal.RegisterWALServer(server, walImpl.Implementation{}) }) err := cmd.Execute() if err != nil {