Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: recycle bin functionality for cephfs #4713

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/unreleased/ceph-recycle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Enhancement: recycle bin functionality for cephfs

This implementation is modeled after the CERN-deployed WinSpaces,
where a folder within each space is designated as the recycle folder
and organized by dates.

https://github.com/cs3org/reva/pull/4713
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/go-playground/validator/v10 v10.19.0
github.com/go-sql-driver/mysql v1.8.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/gogo/protobuf v1.3.2
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang/protobuf v1.5.4
github.com/gomodule/redigo v1.9.2
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFG
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
Expand Down
133 changes: 123 additions & 10 deletions pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
goceph "github.com/ceph/go-ceph/cephfs"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typepb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage"
Expand Down Expand Up @@ -151,6 +152,21 @@ func (fs *cephfs) CreateDir(ctx context.Context, ref *provider.Reference) error
return getRevaError(ctx, err)
}

func getRecycleTargetFromPath(path string, recyclePath string, recyclePathDepth int) (string, error) {
// Tokenize the given (absolute) path
components := strings.Split(filepath.Clean(string(filepath.Separator)+path), string(filepath.Separator))
if recyclePathDepth > len(components)-1 {
return "", errors.New("path is too short")
}

// And construct the target by injecting the recyclePath at the required depth
var target []string = []string{string(filepath.Separator)}
target = append(target, components[:recyclePathDepth+1]...)
target = append(target, recyclePath, time.Now().Format("2006/01/02"))
target = append(target, components[recyclePathDepth+1:]...)
return filepath.Join(target...), nil
}

func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err error) {
var path string
user := fs.makeUser(ctx)
Expand All @@ -161,8 +177,16 @@ func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err erro

log := appctx.GetLogger(ctx)
user.op(func(cv *cacheVal) {
if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory {
err = cv.mount.RemoveDir(path)
if fs.conf.RecyclePath != "" {
// Recycle bin is configured, move to recycle as opposed to unlink
targetPath, err := getRecycleTargetFromPath(path, fs.conf.RecyclePath, fs.conf.RecyclePathDepth)
if err == nil {
err = cv.mount.Rename(path, targetPath)
}
} else {
if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory {
err = cv.mount.RemoveDir(path)
}
}
})

Expand Down Expand Up @@ -502,24 +526,113 @@ func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error
return getRevaError(ctx, err)
}

func (fs *cephfs) EmptyRecycle(ctx context.Context) error {
return errtypes.NotSupported("unimplemented")
}
func (fs *cephfs) listDeletedEntries(ctx context.Context, maxentries int, basePath string, from, to time.Time) (res []*provider.RecycleItem, err error) {
res = []*provider.RecycleItem{}
user := fs.makeUser(ctx)
count := 0
rootRecyclePath := filepath.Join(basePath, fs.conf.RecyclePath)
for d := to; !d.Before(from); d = d.AddDate(0, 0, -1) {

func (fs *cephfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (r *provider.CreateStorageSpaceResponse, err error) {
return nil, errtypes.NotSupported("unimplemented")
user.op(func(cv *cacheVal) {
var dir *goceph.Directory
if dir, err = cv.mount.OpenDir(filepath.Join(rootRecyclePath, d.Format("2006/01/02"))); err != nil {
return
}
defer closeDir(dir)

var entry *goceph.DirEntryPlus
for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) {
//TODO(lopresti) validate content of entry.Name() here.
targetPath := filepath.Join(basePath, entry.Name())
stat := entry.Statx()
res = append(res, &provider.RecycleItem{
Ref: &provider.Reference{Path: targetPath},
Key: filepath.Join(rootRecyclePath, targetPath),
Size: stat.Size,
DeletionTime: &typesv1beta1.Timestamp{
Seconds: uint64(stat.Mtime.Sec),
Nanos: uint32(stat.Mtime.Nsec),
},
})

count += 1
if count > maxentries {
err = errtypes.BadRequest("list too long")
return
}
}
})
}
return res, err
}

func (fs *cephfs) ListRecycle(ctx context.Context, basePath, key, relativePath string, from, to *typepb.Timestamp) ([]*provider.RecycleItem, error) {
return nil, errtypes.NotSupported("unimplemented")
md, err := fs.GetMD(ctx, &provider.Reference{Path: basePath}, nil)
if err != nil {
return nil, err
}
if !md.PermissionSet.ListRecycle {
return nil, errtypes.PermissionDenied("cephfs: user doesn't have permissions to restore recycled items")
}

var dateFrom, dateTo time.Time
if from != nil && to != nil {
dateFrom = time.Unix(int64(from.Seconds), 0)
dateTo = time.Unix(int64(to.Seconds), 0)
if dateFrom.AddDate(0, 0, fs.conf.MaxDaysInRecycleList).Before(dateTo) {
return nil, errtypes.BadRequest("cephfs: too many days requested in listing the recycle bin")
}
} else {
// if no date range was given, list up to two days ago
dateTo = time.Now()
dateFrom = dateTo.AddDate(0, 0, -2)
}

sublog := appctx.GetLogger(ctx).With().Logger()
sublog.Debug().Time("from", dateFrom).Time("to", dateTo).Msg("executing ListDeletedEntries")
recycleEntries, err := fs.listDeletedEntries(ctx, fs.conf.MaxRecycleEntries, basePath, dateFrom, dateTo)
if err != nil {
switch err.(type) {
case errtypes.IsBadRequest:
return nil, errtypes.BadRequest("cephfs: too many entries found in listing the recycle bin")
default:
return nil, errors.Wrap(err, "cephfs: error listing deleted entries")
}
}
return recycleEntries, nil
}

func (fs *cephfs) RestoreRecycleItem(ctx context.Context, basePath, key, relativePath string, restoreRef *provider.Reference) error {
return errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
md, err := fs.GetMD(ctx, &provider.Reference{Path: basePath}, nil)
if err != nil {
return err
}
if !md.PermissionSet.RestoreRecycleItem {
return errtypes.PermissionDenied("cephfs: user doesn't have permissions to restore recycled items")
}

user.op(func(cv *cacheVal) {
//TODO(lopresti) validate content of basePath and relativePath. Key is expected to contain the recycled path
if err = cv.mount.Rename(key, filepath.Join(basePath, relativePath)); err != nil {
return
}
//TODO(tmourati): Add entry id logic, handle already moved file error
})

return getRevaError(err)
}

func (fs *cephfs) PurgeRecycleItem(ctx context.Context, basePath, key, relativePath string) error {
return errtypes.NotSupported("unimplemented")
return errtypes.NotSupported("cephfs: operation not supported")
}

func (fs *cephfs) EmptyRecycle(ctx context.Context) error {
return errtypes.NotSupported("cephfs: operation not supported")
}

func (fs *cephfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (r *provider.CreateStorageSpaceResponse, err error) {
return nil, errtypes.NotSupported("unimplemented")
}

func (fs *cephfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
Expand Down
25 changes: 24 additions & 1 deletion pkg/storage/fs/cephfs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,19 @@ type Options struct {
DirPerms uint32 `mapstructure:"dir_perms"`
FilePerms uint32 `mapstructure:"file_perms"`
UserQuotaBytes uint64 `mapstructure:"user_quota_bytes"`
HiddenDirs map[string]bool
// Path of the recycle bin. If empty, recycling is disabled.
RecyclePath string `mapstructure:"recycle_path"`
// Depth of the Recycle bin location, that is after how many path components
// the recycle path is located: this allows supporting recycles such as
// /top-level/s/space/.recycle with a depth = 3. Defaults to 0.
RecyclePathDepth int `mapstructure:"recycle_path_depth"`
// Maximum entries count a ListRecycle call may return: if exceeded, ListRecycle
// will return a BadRequest error
MaxRecycleEntries int `mapstructure:"max_recycle_entries"`
// Maximum time span in days a ListRecycle call may return: if exceeded, ListRecycle
// will override the "to" date with "from" + this value
MaxDaysInRecycleList int `mapstructure:"max_days_in_recycle_list"`
HiddenDirs map[string]bool
}

func (c *Options) ApplyDefaults() {
Expand Down Expand Up @@ -83,6 +95,9 @@ func (c *Options) ApplyDefaults() {
"..": true,
removeLeadingSlash(c.UploadFolder): true,
}
if c.RecyclePath != "" {
c.HiddenDirs[c.RecyclePath] = true
}

if c.DirPerms == 0 {
c.DirPerms = dirPermDefault
Expand All @@ -95,4 +110,12 @@ func (c *Options) ApplyDefaults() {
if c.UserQuotaBytes == 0 {
c.UserQuotaBytes = 50000000000
}

if c.MaxDaysInRecycleList == 0 {
c.MaxDaysInRecycleList = 14
}

if c.MaxRecycleEntries == 0 {
c.MaxRecycleEntries = 2000
}
}
Loading