Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Signed-off-by: SK Ali Arman <[email protected]>
  • Loading branch information
sheikh-arman committed May 8, 2024
1 parent f71f184 commit ab0688f
Show file tree
Hide file tree
Showing 13 changed files with 360 additions and 225 deletions.
1 change: 1 addition & 0 deletions pkg/cmds/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (o *PauseOptions) Run() error {
}
allErrs = append(allErrs, err)
errs.Insert(err.Error())
continue
}
pauseAll := !(o.onlyBackup || o.onlyDb || o.onlyArchiver)

Expand Down
156 changes: 156 additions & 0 deletions pkg/pauser/archiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package pauser

import (
"context"
coreapi "kubedb.dev/apimachinery/apis/archiver/v1alpha1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kmapi "kmodules.xyz/client-go/api/v1"
kmc "kmodules.xyz/client-go/client"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func PauseOrResumeMySQLArchiver(klient client.Client, value bool, reference kmapi.ObjectReference) error {
name := reference.Name
namespace := reference.Namespace
archiver, err := getMysqlArchiver(klient, kmapi.ObjectReference{
Name: name,
Namespace: namespace,
})
if err != nil {
return err
}
_, err = kmc.CreateOrPatch(
context.Background(),
klient,
archiver,
func(obj client.Object, createOp bool) client.Object {
in := obj.(*coreapi.MySQLArchiver)
in.Spec.Pause = value
return in
},
)
return err
}

func getMysqlArchiver(klient client.Client, ref kmapi.ObjectReference) (*coreapi.MySQLArchiver, error) {
archiver := &coreapi.MySQLArchiver{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Name,
Namespace: ref.Namespace,
},
}
if err := klient.Get(context.Background(), client.ObjectKeyFromObject(archiver), archiver); err != nil {
return nil, err
}
return archiver, nil
}

func PauseOrResumeMariaDBArchiver(klient client.Client, value bool, reference kmapi.ObjectReference) error {
name := reference.Name
namespace := reference.Namespace
archiver, err := getMariaDBArchiver(klient, kmapi.ObjectReference{
Name: name,
Namespace: namespace,
})
if err != nil {
return err
}
_, err = kmc.CreateOrPatch(
context.Background(),
klient,
archiver,
func(obj client.Object, createOp bool) client.Object {
in := obj.(*coreapi.MariaDBArchiver)
in.Spec.Pause = value
return in
},
)
return err
}

func getMariaDBArchiver(klient client.Client, ref kmapi.ObjectReference) (*coreapi.MariaDBArchiver, error) {
archiver := &coreapi.MariaDBArchiver{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Name,
Namespace: ref.Namespace,
},
}
if err := klient.Get(context.Background(), client.ObjectKeyFromObject(archiver), archiver); err != nil {
return nil, err
}

return archiver, nil
}

func PauseOrResumePostgresArchiver(klient client.Client, value bool, reference kmapi.ObjectReference) error {
name := reference.Name
namespace := reference.Namespace
archiver, err := getPostgresArchiver(klient, kmapi.ObjectReference{
Name: name,
Namespace: namespace,
})
if err != nil {
return err
}
_, err = kmc.CreateOrPatch(
context.Background(),
klient,
archiver,
func(obj client.Object, createOp bool) client.Object {
in := obj.(*coreapi.PostgresArchiver)
in.Spec.Pause = value
return in
},
)
return err
}

func getPostgresArchiver(klient client.Client, ref kmapi.ObjectReference) (*coreapi.PostgresArchiver, error) {
archiver := &coreapi.PostgresArchiver{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Name,
Namespace: ref.Namespace,
},
}
if err := klient.Get(context.Background(), client.ObjectKeyFromObject(archiver), archiver); err != nil {
return nil, err
}
return archiver, nil
}

func PauseOrResumeMongoDBArchiver(klient client.Client, value bool, reference kmapi.ObjectReference) error {
name := reference.Name
namespace := reference.Namespace
archiver, err := getMongoDBArchiver(klient, kmapi.ObjectReference{
Name: name,
Namespace: namespace,
})
if err != nil {
return err
}
_, err = kmc.CreateOrPatch(
context.Background(),
klient,
archiver,
func(obj client.Object, createOp bool) client.Object {
in := obj.(*coreapi.MongoDBArchiver)
in.Spec.Pause = value
return in
},
)
return err
}

func getMongoDBArchiver(klient client.Client, ref kmapi.ObjectReference) (*coreapi.MongoDBArchiver, error) {
archiver := &coreapi.MongoDBArchiver{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Name,
Namespace: ref.Namespace,
},
}
if err := klient.Get(context.Background(), client.ObjectKeyFromObject(archiver), archiver); err != nil {
return nil, err
}
return archiver, nil
}
48 changes: 38 additions & 10 deletions pkg/pauser/mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,32 @@ package pauser
import (
"context"

coreapi "kubedb.dev/apimachinery/apis/archiver/v1alpha1"
api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
acs "kubedb.dev/apimachinery/client/clientset/versioned/typed/archiver/v1alpha1"
cs "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2"
dbutil "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2/util"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
kmc "kmodules.xyz/client-go/client"
condutil "kmodules.xyz/client-go/conditions"
"sigs.k8s.io/controller-runtime/pkg/client"
scs "stash.appscode.dev/apimachinery/client/clientset/versioned/typed/stash/v1beta1"
)

type MariaDBPauser struct {
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
onlyDb bool
onlyBackup bool
archiverClient acs.ArchiverV1alpha1Interface
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
uncachedClient client.Client
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewMariaDBPauser(clientConfig *rest.Config, onlyDb, onlyBackup bool) (*MariaDBPauser, error) {
func NewMariaDBPauser(clientConfig *rest.Config, onlyDb, onlyBackup, onlyArchiver bool) (*MariaDBPauser, error) {
dbClient, err := cs.NewForConfig(clientConfig)
if err != nil {
return nil, err
Expand All @@ -48,11 +55,24 @@ func NewMariaDBPauser(clientConfig *rest.Config, onlyDb, onlyBackup bool) (*Mari
return nil, err
}

archiverClient, err := acs.NewForConfig(clientConfig)
if err != nil {
return nil, err
}

uncachedClient, err := kmc.NewUncachedClient(clientConfig, coreapi.AddToScheme)
if err != nil {
return nil, err
}

return &MariaDBPauser{
dbClient: dbClient,
stashClient: stashClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
archiverClient: archiverClient,
dbClient: dbClient,
stashClient: stashClient,
uncachedClient: uncachedClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
onlyArchiver: onlyArchiver,
}, nil
}

Expand All @@ -62,7 +82,15 @@ func (e *MariaDBPauser) Pause(name, namespace string) (bool, error) {
return false, nil
}

pauseAll := !(e.onlyBackup || e.onlyDb)
pauseAll := !(e.onlyBackup || e.onlyDb || e.onlyArchiver)
if e.onlyArchiver || pauseAll {
if err := PauseOrResumeMariaDBArchiver(e.uncachedClient, true, db.Spec.Archiver.Ref); err != nil {
return false, err
}
if e.onlyArchiver {
return false, nil
}
}

if e.onlyDb || pauseAll {
_, err = dbutil.UpdateMariaDBStatus(context.TODO(), e.dbClient, db.ObjectMeta, func(status *api.MariaDBStatus) (types.UID, *api.MariaDBStatus) {
Expand Down
32 changes: 21 additions & 11 deletions pkg/pauser/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@ package pauser
import (
"context"

coreapi "kubedb.dev/apimachinery/apis/archiver/v1alpha1"
api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
cs "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2"
dbutil "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2/util"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
kmc "kmodules.xyz/client-go/client"
condutil "kmodules.xyz/client-go/conditions"
"sigs.k8s.io/controller-runtime/pkg/client"
scs "stash.appscode.dev/apimachinery/client/clientset/versioned/typed/stash/v1beta1"
)

type MongoDBPauser struct {
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
onlyDb bool
onlyBackup bool
onlyArchiver bool
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
uncachedClient client.Client
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewMongoDBPauser(clientConfig *rest.Config, onlyDb, onlyBackup, onlyArchiver bool) (*MongoDBPauser, error) {
Expand All @@ -49,12 +53,18 @@ func NewMongoDBPauser(clientConfig *rest.Config, onlyDb, onlyBackup, onlyArchive
return nil, err
}

uncachedClient, err := kmc.NewUncachedClient(clientConfig, coreapi.AddToScheme)
if err != nil {
return nil, err
}

return &MongoDBPauser{
dbClient: dbClient,
stashClient: stashClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
onlyArchiver: onlyArchiver,
dbClient: dbClient,
stashClient: stashClient,
uncachedClient: uncachedClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
onlyArchiver: onlyArchiver,
}, nil
}

Expand All @@ -66,7 +76,7 @@ func (e *MongoDBPauser) Pause(name, namespace string) (bool, error) {

pauseAll := !(e.onlyBackup || e.onlyDb || e.onlyArchiver)
if e.onlyArchiver || pauseAll {
if err := PauseMongoDBArchiver(true, db.Spec.Archiver.Ref.Name, db.Spec.Archiver.Ref.Namespace); err != nil {
if err := PauseOrResumeMongoDBArchiver(e.uncachedClient, true, db.Spec.Archiver.Ref); err != nil {
return false, err
}
if e.onlyArchiver {
Expand Down
32 changes: 21 additions & 11 deletions pkg/pauser/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@ package pauser
import (
"context"

coreapi "kubedb.dev/apimachinery/apis/archiver/v1alpha1"
api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
cs "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2"
dbutil "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2/util"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
kmc "kmodules.xyz/client-go/client"
condutil "kmodules.xyz/client-go/conditions"
"sigs.k8s.io/controller-runtime/pkg/client"
scs "stash.appscode.dev/apimachinery/client/clientset/versioned/typed/stash/v1beta1"
)

type MySQLPauser struct {
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
onlyDb bool
onlyBackup bool
onlyArchiver bool
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
uncachedClient client.Client
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewMySQLPauser(clientConfig *rest.Config, onlyDb, onlyBackup, onlyArchiver bool) (*MySQLPauser, error) {
Expand All @@ -49,12 +53,18 @@ func NewMySQLPauser(clientConfig *rest.Config, onlyDb, onlyBackup, onlyArchiver
return nil, err
}

uncachedClient, err := kmc.NewUncachedClient(clientConfig, coreapi.AddToScheme)
if err != nil {
return nil, err
}

return &MySQLPauser{
dbClient: dbClient,
stashClient: stashClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
onlyArchiver: onlyArchiver,
dbClient: dbClient,
stashClient: stashClient,
uncachedClient: uncachedClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
onlyArchiver: onlyArchiver,
}, nil
}

Expand All @@ -65,7 +75,7 @@ func (e *MySQLPauser) Pause(name string, namespace string) (bool, error) {
}
pauseAll := !(e.onlyBackup || e.onlyDb || e.onlyArchiver)
if e.onlyArchiver || pauseAll {
if err := PauseMySQLArchiver(true, db.Spec.Archiver.Ref.Name, db.Spec.Archiver.Ref.Namespace); err != nil {
if err := PauseOrResumeMySQLArchiver(e.uncachedClient, true, db.Spec.Archiver.Ref); err != nil {
return false, err
}
if e.onlyArchiver {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pauser/pauser.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewPauser(restClientGetter genericclioptions.RESTClientGetter, mapping *met
case api.ResourceKindMySQL:
return NewMySQLPauser(clientConfig, onlyDb, onlyBackup, onlyArchiver)
case api.ResourceKindMariaDB:
return NewMariaDBPauser(clientConfig, onlyDb, onlyBackup)
return NewMariaDBPauser(clientConfig, onlyDb, onlyBackup, onlyArchiver)
case api.ResourceKindPostgres:
return NewPostgresPauser(clientConfig, onlyDb, onlyBackup, onlyArchiver)
case api.ResourceKindRedis:
Expand Down
Loading

0 comments on commit ab0688f

Please sign in to comment.