Skip to content

Commit

Permalink
feat: initial implementation of the WAL service
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Cecchi <[email protected]>
  • Loading branch information
leonardoce authored and mnencia committed Feb 1, 2024
1 parent 3f942ff commit 33c27ee
Show file tree
Hide file tree
Showing 12 changed files with 411 additions and 2 deletions.
4 changes: 4 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cloudnative-pg
bin
.github
.git
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ RUN go mod download

# Compile the application
COPY . /app
RUN ./scripts/build.sh
RUN --mount=type=cache,target=/root/.cache/go-build ./scripts/build.sh

# Step 2: build the image to be actually run
FROM alpine:3.18.4
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.21.6

require (
github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8
github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f
github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426
github.com/evanphx/json-patch/v5 v5.8.1
github.com/go-logr/logr v1.3.0
github.com/go-logr/zapr v1.2.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8 h
github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8/go.mod h1:r6blheO2ihiuqKbk6rqPN5//PPJnYtKCGT2OxpXtk2o=
github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f h1:ypwPq45y8ezzwxUTHL0VkzkT2+pcHnE4yRoeGTP8fp8=
github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f/go.mod h1:0G5GXQVj09KvONIcYURyroL74zOFGjv4eI5OXz7/G/0=
github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426 h1:eW94u+AQoFR+KDyIenekcHWCE6Kc48mo8CgGB+VOzKU=
github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426/go.mod h1:0G5GXQVj09KvONIcYURyroL74zOFGjv4eI5OXz7/G/0=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down
133 changes: 133 additions & 0 deletions internal/fileutils/cp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package fileutils

import (
"bytes"
"fmt"
"io"
"os"
"os/user"
"path/filepath"
"strings"
)

// This implementation is based on https://github.com/nmrshll/go-cp/blob/master/cp.go

func replaceHomeFolder(path string) (string, error) {
if !strings.HasPrefix(path, "~") {
return path, nil
}
var buffer bytes.Buffer
usr, err := user.Current()
if err != nil {
return "", err
}
_, err = buffer.WriteString(usr.HomeDir)
if err != nil {
return "", err
}
_, err = buffer.WriteString(strings.TrimPrefix(path, "~"))
if err != nil {
return "", err
}

return buffer.String(), nil
}

// AbsolutePath converts a path (relative or absolute) into an absolute one.
// Supports '~' notation for $HOME directory of the current user.
func AbsolutePath(path string) (string, error) {
homeReplaced, err := replaceHomeFolder(path)
if err != nil {
return "", err
}
return filepath.Abs(homeReplaced)
}

// CopyFile copies a file from src to dst. If src and dst files exist, and are
// the same, then return success. Otherwise, attempt to create a hard link
// between the two files. If that fails, copy the file contents from src to dst.
// Creates any missing directories. Supports '~' notation for $HOME directory of the current user.
func CopyFile(src, dst string) error {
srcAbs, err := AbsolutePath(src)
if err != nil {
return err
}
dstAbs, err := AbsolutePath(dst)
if err != nil {
return err
}

// open source file
sfi, err := os.Stat(srcAbs)
if err != nil {
return err
}
if !sfi.Mode().IsRegular() {
// cannot copy non-regular files (e.g., directories,
// symlinks, devices, etc.)
return fmt.Errorf("CopyFile: non-regular source file %s (%q)", sfi.Name(), sfi.Mode().String())
}

// open dest file
dfi, err := os.Stat(dstAbs)
if err != nil && !os.IsNotExist(err) {
return err
}

if err != nil {
// file doesn't exist
err := os.MkdirAll(filepath.Dir(dst), 0o750)
if err != nil {
return err
}
} else {
if !(dfi.Mode().IsRegular()) {
return fmt.Errorf("CopyFile: non-regular destination file %s (%q)", dfi.Name(), dfi.Mode().String())
}
if os.SameFile(sfi, dfi) {
return err
}
}
if err = os.Link(src, dst); err == nil {
return err
}
return copyFileContents(src, dst)
}

// copyFileContents copies the contents of the file named src to the file named
// by dst. The file will be created if it does not already exist. If the
// destination file exists, all it's contents will be replaced by the contents
// of the source file.
func copyFileContents(src, dst string) error {
// Open the source file for reading
srcFile, err := os.Open(src) // nolint:gosec
if err != nil {
return err
}
defer func() {
_ = srcFile.Close()
}()

// Open the destination file for writing
dstFile, err := os.Create(dst) // nolint:gosec
if err != nil {
return err
}
// Return any errors that result from closing the destination file
// Will return nil if no errors occurred
defer func() {
cerr := dstFile.Close()
if err == nil {
err = cerr
}
}()

// Copy the contents of the source file into the destination files
size := 1024 * 1024
buf := make([]byte, size)
if _, err = io.CopyBuffer(dstFile, srcFile, buf); err != nil {
return err
}
err = dstFile.Sync()
return err
}
2 changes: 2 additions & 0 deletions internal/fileutils/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package fileutils contains a set of useful functions to manage files
package fileutils
3 changes: 3 additions & 0 deletions internal/wal/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package wal contains the implementation of the
// WAL Manager server
package wal
51 changes: 51 additions & 0 deletions internal/wal/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package wal

import (
"context"

"github.com/cloudnative-pg/cnpg-i/pkg/wal"
)

// Implementation is the implementation of the identity service
type Implementation struct {
wal.WALServer
}

// GetCapabilities gets the capabilities of the WAL service
func (Implementation) GetCapabilities(
context.Context,
*wal.WALCapabilitiesRequest,
) (*wal.WALCapabilitiesResult, error) {
return &wal.WALCapabilitiesResult{
Capabilities: []*wal.WALCapability{
{
Type: &wal.WALCapability_Rpc{
Rpc: &wal.WALCapability_RPC{
Type: wal.WALCapability_RPC_TYPE_ARCHIVE_WAL,
},
},
},
{
Type: &wal.WALCapability_Rpc{
Rpc: &wal.WALCapability_RPC{
Type: wal.WALCapability_RPC_TYPE_RESTORE_WAL,
},
},
},
{
Type: &wal.WALCapability_Rpc{
Rpc: &wal.WALCapability_RPC{
Type: wal.WALCapability_RPC_TYPE_STATUS,
},
},
},
{
Type: &wal.WALCapability_Rpc{
Rpc: &wal.WALCapability_RPC{
Type: wal.WALCapability_RPC_TYPE_SET_FIRST_REQUIRED,
},
},
},
},
}, nil
}
106 changes: 106 additions & 0 deletions internal/wal/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package wal

import (
"context"
"fmt"
"io/fs"
"os"
"path"

"github.com/cloudnative-pg/cnpg-i/pkg/wal"

"github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging"
"github.com/cloudnative-pg/plugin-pvc-backup/pkg/metadata"
"github.com/cloudnative-pg/plugin-pvc-backup/pkg/pluginhelper"
)

type walStatMode string

const (
walStatModeFirst = "first"
walStatModeLast = "last"
)

// Status gets the statistics of the WAL file archive
func (Implementation) Status(
ctx context.Context,
request *wal.WALStatusRequest,
) (*wal.WALStatusResult, error) {
logging := logging.FromContext(ctx)

helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition)
if err != nil {
logging.Error(err, "Error while decoding cluster definition from CNPG")
return nil, err
}

walPath := getWALPath(helper.GetCluster().Name)
logging = logging.WithValues(
"walPath", walPath,
"clusterName", helper.GetCluster().Name,
)

walDirEntries, err := os.ReadDir(walPath)
if err != nil {
logging.Error(err, "Error while reading WALs directory")
return nil, err
}

firstWal, err := getWALStat(helper.GetCluster().Name, walDirEntries, walStatModeFirst)
if err != nil {
logging.Error(err, "Error while reading WALs directory (getting first WAL)")
return nil, err
}

lastWal, err := getWALStat(helper.GetCluster().Name, walDirEntries, walStatModeLast)
if err != nil {
logging.Error(err, "Error while reading WALs directory (getting first WAL)")
return nil, err
}

return &wal.WALStatusResult{
FirstWal: firstWal,
LastWal: lastWal,
}, nil
}

func getWALStat(clusterName string, entries []fs.DirEntry, mode walStatMode) (string, error) {
entry, ok := getEntry(entries, mode)
if !ok {
return "", nil
}

if !entry.IsDir() {
return "", fmt.Errorf("%s is not a directory", entry)
}

entryAbsolutePath := path.Join(getWALPath(clusterName), entry.Name())
subFolderEntries, err := os.ReadDir(entryAbsolutePath)
if err != nil {
return "", fmt.Errorf("while reading %s entries: %w", entry, err)
}

selectSubFolderEntry, ok := getEntry(subFolderEntries, mode)
if !ok {
return "", nil
}

return selectSubFolderEntry.Name(), nil
}

func getEntry(entries []fs.DirEntry, mode walStatMode) (fs.DirEntry, bool) {
if len(entries) == 0 {
return nil, false
}

switch mode {
case walStatModeFirst:
return entries[0], true

case walStatModeLast:
return entries[len(entries)-1], true

default:
return nil, false
}
}
26 changes: 26 additions & 0 deletions internal/wal/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package wal

import "path"

func getWalPrefix(walName string) string {
return walName[0:16]
}

func getClusterPath(clusterName string) string {
return path.Join(basePath, clusterName)
}

func getWALPath(clusterName string) string {
return path.Join(
getClusterPath(clusterName),
walsDirectory,
)
}

func getWALFilePath(clusterName string, walName string) string {
return path.Join(
getWALPath(clusterName),
getWalPrefix(walName),
walName,
)
}
Loading

0 comments on commit 33c27ee

Please sign in to comment.