Skip to content

Commit

Permalink
feat: create cloud backup
Browse files Browse the repository at this point in the history
CSI create snapshot will now support triggering snapshot that will initiate a cloud backup.

Signed-off-by: Shivanjan Chakravorty <[email protected]>
  • Loading branch information
Glitchfix committed Jul 10, 2023
1 parent 5d3f506 commit 4f5b27d
Show file tree
Hide file tree
Showing 7 changed files with 966 additions and 18 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,15 @@ mockgen:
mockgen -destination=api/mock/mock_diags.go -package=mock github.com/libopenstorage/openstorage/api OpenStorageDiagsServer,OpenStorageDiagsClient
mockgen -destination=api/mock/mock_volume.go -package=mock github.com/libopenstorage/openstorage/api OpenStorageVolumeServer,OpenStorageVolumeClient
mockgen -destination=api/mock/mock_bucket.go -package=mock github.com/libopenstorage/openstorage/api OpenStorageBucketServer,OpenStorageBucketClient
mockgen -destination=api/mock/mock_cloud_backup.go -package=mock github.com/libopenstorage/openstorage/api OpenStorageCloudBackupServer,OpenStorageCloudBackupClient
mockgen -destination=cluster/mock/cluster.mock.go -package=mock github.com/libopenstorage/openstorage/cluster Cluster
mockgen -destination=api/mock/mock_fstrim.go -package=mock github.com/libopenstorage/openstorage/api OpenStorageFilesystemTrimServer,OpenStorageFilesystemTrimClient
mockgen -destination=api/mock/mock_fscheck.go -package=mock github.com/libopenstorage/openstorage/api OpenStorageFilesystemCheckServer,OpenStorageFilesystemCheckClient
mockgen -destination=api/server/mock/mock_schedops_k8s.go -package=mock github.com/portworx/sched-ops/k8s/core Ops
mockgen -destination=volume/drivers/mock/driver.mock.go -package=mock github.com/libopenstorage/openstorage/volume VolumeDriver
mockgen -destination=bucket/drivers/mock/bucket_driver.mock.go -package=mock github.com/libopenstorage/openstorage/bucket BucketDriver
mockgen -destination=pkg/loadbalancer/mock/balancer.go -package=mock github.com/libopenstorage/openstorage/pkg/loadbalancer Balancer


osd-tests: install
./hack/csi-sanity-test.sh
Expand Down
585 changes: 585 additions & 0 deletions api/mock/mock_cloud_backup.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions api/server/sdk/volume_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,10 +768,10 @@ func (s *VolumeServer) VolumeBytesUsedByNode(
req *api.SdkVolumeBytesUsedRequest,
) (*api.SdkVolumeBytesUsedResponse, error) {
return nil, status.Errorf(
codes.Unimplemented,
"Failed to obtain volume utilization on node %s: %v",
req.GetNodeId(),
volume.ErrNotSupported.Error())
codes.Unimplemented,
"Failed to obtain volume utilization on node %s: %v",
req.GetNodeId(),
volume.ErrNotSupported.Error())
}

func (s *VolumeServer) CapacityUsage(
Expand Down
107 changes: 107 additions & 0 deletions csi/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
osdPvcAnnotationsKey = osdParameterPrefix + "pvc-annotations"
osdPvcLabelsKey = osdParameterPrefix + "pvc-labels"

// These keys are for accessing Snapshot Metadata added from the external-provisioner
osdSnapshotLabelsTypeKey = osdParameterPrefix + "snapshot-type"
osdSnapshotCredentialIDKey = osdParameterPrefix + "credential-id"

// in-tree keys for name and namespace
intreePvcNameKey = "pvc"
intreePvcNamespaceKey = "namespace"
Expand All @@ -60,6 +64,10 @@ const (
volumeCapabilityMessageReadOnlyVolume = "Volume is read only"
volumeCapabilityMessageNotReadOnlyVolume = "Volume is not read only"
defaultCSIVolumeSize = uint64(units.GiB * 1)

// driver type
DriverTypeLocal = "local"
DriverTypeCloud = "cloud"
)

// ControllerGetCapabilities is a CSI API functions which returns to the caller
Expand Down Expand Up @@ -887,6 +895,35 @@ func (s *OsdCsiServer) CreateSnapshot(
return nil, status.Error(codes.InvalidArgument, "Name must be provided")
}

// Get secret if any was passed
ctx = s.setupContext(ctx, req.GetSecrets())
ctx, cancel := grpcutil.WithDefaultTimeout(ctx)
defer cancel()

// Get any labels passed in by the CO
_, locator, _, err := s.specHandler.SpecFromOpts(req.GetParameters())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Unable to get parameters: %v", err)
}
// Check ID is valid with the specified volume capabilities
snapshotType, ok := locator.VolumeLabels[osdSnapshotLabelsTypeKey]
if !ok {
snapshotType = DriverTypeLocal
}
switch snapshotType {
case DriverTypeCloud:
return s.createCloudBackup(ctx, req)
case DriverTypeLocal:
fallthrough
default:
return s.createLocalSnapshot(ctx, req)
}
}

func (s *OsdCsiServer) createLocalSnapshot(
ctx context.Context,
req *csi.CreateSnapshotRequest,
) (*csi.CreateSnapshotResponse, error) {
// Get grpc connection
conn, err := s.getConn()
if err != nil {
Expand Down Expand Up @@ -980,6 +1017,76 @@ func (s *OsdCsiServer) CreateSnapshot(
},
}, nil
}
func (s *OsdCsiServer) getCloudBackupClient(ctx context.Context) (api.OpenStorageCloudBackupClient, error) {
// Get grpc connection
conn, err := s.getRemoteConn(ctx)
if err != nil {
return nil, status.Errorf(
codes.Unavailable,
"Unable to connect to SDK server: %v", err)
}
return s.cloudBackupClient(conn), nil
}

func (s *OsdCsiServer) createCloudBackup(
ctx context.Context,
req *csi.CreateSnapshotRequest,
) (*csi.CreateSnapshotResponse, error) {
cloudBackupClient, err := s.getCloudBackupClient(ctx)
if err != nil {
return nil, err
}

// Get any labels passed in by the CO
_, locator, _, err := s.specHandler.SpecFromOpts(req.GetParameters())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Unable to get parameters: %v", err)
}

credentialID := locator.VolumeLabels[osdSnapshotCredentialIDKey]
backupID := req.GetName()
// Create snapshot
_, err = cloudBackupClient.Create(ctx, &api.SdkCloudBackupCreateRequest{
VolumeId: req.GetSourceVolumeId(),
TaskId: backupID,
CredentialId: credentialID,
Labels: locator.GetVolumeLabels(),
})

if err != nil {
return nil, status.Errorf(codes.Aborted, "Failed to create cloud snapshot: %v", err)
}

var isBackupReady bool
var backupStatus *api.SdkCloudBackupStatusResponse

// Check if snapshot has been created but is in error state
backupStatus, errFindFailed := cloudBackupClient.Status(ctx, &api.SdkCloudBackupStatusRequest{
VolumeId: req.GetSourceVolumeId(),
TaskId: backupID,
})
if errFindFailed != nil {
return nil, status.Errorf(codes.Aborted, "Failed to create cloud snapshot: %v", err)
}
isBackupReady = backupStatus.Statuses[backupID].Status == api.SdkCloudBackupStatusType_SdkCloudBackupStatusTypeDone

snapSize, errSizeFailed := cloudBackupClient.Size(ctx, &api.SdkCloudBackupSizeRequest{
BackupId: backupID,
})
if errSizeFailed != nil {
return nil, status.Errorf(codes.Aborted, "Failed to get cloud snapshot size: %v", err)
}

return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: int64(snapSize.GetTotalDownloadBytes()),
SnapshotId: backupID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: backupStatus.Statuses[backupID].StartTime,
ReadyToUse: isBackupReady,
},
}, nil
}

// DeleteSnapshot is a CSI implementation to delete a snapshot
func (s *OsdCsiServer) DeleteSnapshot(
Expand Down
205 changes: 201 additions & 4 deletions csi/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@ package csi

import (
"encoding/json"
"errors"
"fmt"
"math"
"reflect"
"sync"
"testing"

csi "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"

"github.com/libopenstorage/openstorage/api"
"github.com/libopenstorage/openstorage/api/mock"
"github.com/libopenstorage/openstorage/api/spec"
authsecrets "github.com/libopenstorage/openstorage/pkg/auth/secrets"
mockLoadBalancer "github.com/libopenstorage/openstorage/pkg/loadbalancer/mock"
"github.com/libopenstorage/openstorage/pkg/units"

csi "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

func containsCap(c csi.ControllerServiceCapability_RPC_Type, resp *csi.ControllerGetCapabilitiesResponse) bool {
Expand Down Expand Up @@ -3372,3 +3378,194 @@ func TestGetCapacity(t *testing.T) {
assert.NotNil(t, res)
assert.Equal(t, int64(0), res.AvailableCapacity)
}

type fakeOsdCsiServer struct {
*OsdCsiServer
mockCloudBackupClient api.OpenStorageCloudBackupClient
}

func (f *fakeOsdCsiServer) getCloudBackupClient(ctx context.Context) (api.OpenStorageCloudBackupClient, error) {
return f.mockCloudBackupClient, nil
}
func TestOsdCsiServer_CreateSnapshot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockCloudBackupClient := mock.NewMockOpenStorageCloudBackupClient(ctrl)

ctx := context.Background()

mockErr := errors.New("MOCK ERROR")
creationTime := timestamppb.Now()

mockCloudBackupClient.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, req *api.SdkCloudBackupCreateRequest, opts ...grpc.CallOption) (*api.SdkCloudBackupCreateResponse, error) {
if req.TaskId == "create-error" {
return nil, mockErr
}

if req.TaskId == "create-notfound" {
return nil, status.Errorf(codes.NotFound, "Volume id not found")
}

return &api.SdkCloudBackupCreateResponse{
TaskId: req.TaskId,
}, nil

}).AnyTimes()

mockCloudBackupClient.EXPECT().Status(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, req *api.SdkCloudBackupStatusRequest, opts ...grpc.CallOption) (*api.SdkCloudBackupStatusResponse, error) {
if req.TaskId == "status-error" {
return nil, mockErr
}

// if req.TaskId == "status-failed" ||
if req.TaskId == "delete-error" {
return &api.SdkCloudBackupStatusResponse{
Statuses: map[string]*api.SdkCloudBackupStatus{
req.TaskId: {
Status: api.SdkCloudBackupStatusType_SdkCloudBackupStatusTypeFailed,
StartTime: creationTime,
},
},
}, nil
}

return &api.SdkCloudBackupStatusResponse{
Statuses: map[string]*api.SdkCloudBackupStatus{
req.TaskId: {
Status: api.SdkCloudBackupStatusType_SdkCloudBackupStatusTypeDone,
StartTime: creationTime,
},
},
}, nil

}).AnyTimes()

mockCloudBackupClient.EXPECT().Delete(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, req *api.SdkCloudBackupDeleteRequest, opts ...grpc.CallOption) (*api.SdkCloudBackupDeleteResponse, error) {
if req.BackupId == "delete-error" {
return nil, mockErr
}

return &api.SdkCloudBackupDeleteResponse{}, nil

}).AnyTimes()

mockCloudBackupClient.EXPECT().Size(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, req *api.SdkCloudBackupSizeRequest, opts ...grpc.CallOption) (*api.SdkCloudBackupSizeResponse, error) {
if req.BackupId == "size-error" {
return nil, mockErr
}

return &api.SdkCloudBackupSizeResponse{
TotalDownloadBytes: defaultCSIVolumeSize,
}, nil

}).AnyTimes()

mockSourceVolumeID := "mock-volume-id"

tests := []struct {
name string
SnapshotName string
want *csi.CreateSnapshotResponse
wantErr bool
}{
{
"remote client connection failed",
"remote-client-error",
nil,
true,
},
{
"fail snapshot create",
"create-error",
nil,
true,
},
{
"volume id not found while creating",
"create-notfound",
nil,
true,
},
{
"fail to get snapshot status",
"status-error",
nil,
true,
},
{
"fail to cleanup failed snapshot",
"delete-error",
nil,
true,
},
{
"fail to get snapshot size",
"size-error",
nil,
true,
},
{
"creation completes without any error",
"ok",
&csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: int64(defaultCSIVolumeSize),
SnapshotId: "ok",
SourceVolumeId: mockSourceVolumeID,
CreationTime: creationTime,
ReadyToUse: true,
},
},
false,
},
}
mockRoundRobinBalancer := mockLoadBalancer.NewMockBalancer(ctrl)
// nil, false, nil
mockRoundRobinBalancer.EXPECT().GetRemoteNodeConnection(gomock.Any()).DoAndReturn(
func(ctx context.Context) (*grpc.ClientConn, bool, error) {
var err error
if ctx.Value("remote-client-error").(bool) {
err = mockErr
}
return nil, false, err
}).AnyTimes()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := &csi.CreateSnapshotRequest{
Name: tt.SnapshotName,
SourceVolumeId: mockSourceVolumeID,
Parameters: map[string]string{
api.SpecLabels: osdSnapshotLabelsTypeKey + "=cloud",
},
}

s := &OsdCsiServer{
specHandler: spec.NewSpecHandler(),
mu: sync.Mutex{},
cloudBackupClient: func(cc grpc.ClientConnInterface) api.OpenStorageCloudBackupClient {
return mockCloudBackupClient
},
roundRobinBalancer: mockRoundRobinBalancer,
}

doClientErr := tt.SnapshotName == "remote-client-error"

ctx = context.WithValue(ctx, "remote-client-error", doClientErr)

got, err := s.CreateSnapshot(ctx, req)
if (err != nil) != tt.wantErr {
t.Errorf("OsdCsiServer.CreateSnapshot() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("OsdCsiServer.CreateSnapshot() = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit 4f5b27d

Please sign in to comment.