Skip to content

Commit

Permalink
Merge pull request #8198 from kaovilai/pes-controller
Browse files Browse the repository at this point in the history
Add controller name to periodical_enqueue_source
  • Loading branch information
ywk253100 authored Sep 12, 2024
2 parents 1110853 + c8aa37d commit 5b4c8cd
Show file tree
Hide file tree
Showing 14 changed files with 25 additions and 14 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8198-kaovilai
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add controller name to periodical_enqueue_source. The logger parameter now includes an additional field with the value of reflect.TypeOf(objList).String() and another field with the value of controllerName.
2 changes: 2 additions & 0 deletions pkg/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const (
ControllerBackupRepo = "backup-repo"
ControllerBackupStorageLocation = "backup-storage-location"
ControllerBackupSync = "backup-sync"
ControllerDataDownload = "data-download"
ControllerDataUpload = "data-upload"
ControllerDownloadRequest = "download-request"
ControllerGarbageCollection = "gc"
ControllerPodVolumeBackup = "pod-volume-backup"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/backup_deletion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func NewBackupDeletionReconciler(

func (r *backupDeletionReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Make sure the expired requests can be deleted eventually
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.DeleteBackupRequestList{}, time.Hour, kube.PeriodicalEnqueueSourceOption{})
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerBackupDeletion), mgr.GetClient(), &velerov1api.DeleteBackupRequestList{}, time.Hour, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.DeleteBackupRequest{}).
WatchesRawSource(s, nil).
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/backup_operations_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
"github.com/vmware-tanzu/velero/pkg/itemoperationmap"
"github.com/vmware-tanzu/velero/pkg/metrics"
Expand Down Expand Up @@ -84,7 +85,7 @@ func NewBackupOperationsReconciler(
}

func (c *backupOperationsReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, c.frequency, kube.PeriodicalEnqueueSourceOption{})
s := kube.NewPeriodicalEnqueueSource(c.logger.WithField("controller", constant.ControllerBackupOperations), mgr.GetClient(), &velerov1api.BackupList{}, c.frequency, kube.PeriodicalEnqueueSourceOption{})
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
backup := object.(*velerov1api.Backup)
return (backup.Status.Phase == velerov1api.BackupPhaseWaitingForPluginOperations ||
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/backup_repository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/repository"
repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config"
Expand Down Expand Up @@ -75,7 +76,7 @@ func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client
}

func (r *BackupRepoReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.BackupRepositoryList{}, repoSyncPeriod, kube.PeriodicalEnqueueSourceOption{})
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerBackupRepo), mgr.GetClient(), &velerov1api.BackupRepositoryList{}, repoSyncPeriod, kube.PeriodicalEnqueueSourceOption{})

return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.BackupRepository{}, builder.WithPredicates(kube.SpecChangePredicate{})).
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/backup_storage_location_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *backupStorageLocationReconciler) logReconciledPhase(defaultFound bool,

func (r *backupStorageLocationReconciler) SetupWithManager(mgr ctrl.Manager) error {
g := kube.NewPeriodicalEnqueueSource(
r.log,
r.log.WithField("controller", constant.ControllerBackupStorageLocation),
mgr.GetClient(),
&velerov1api.BackupStorageLocationList{},
bslValidationEnqueuePeriod,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/backup_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (b *backupSyncReconciler) filterBackupOwnerReferences(ctx context.Context,
// SetupWithManager is used to setup controller and its watching sources.
func (b *backupSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
backupSyncSource := kube.NewPeriodicalEnqueueSource(
b.logger,
b.logger.WithField("controller", constant.ControllerBackupSync),
mgr.GetClient(),
&velerov1api.BackupStorageLocationList{},
backupSyncReconcilePeriod,
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/constant"
datamover "github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
Expand Down Expand Up @@ -497,7 +498,7 @@ func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, nam
// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted
// pod status and also avoid block others CR handling
func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{})
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataDownload), r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{})
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
dd := object.(*velerov2alpha1api.DataDownload)
return (dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
Expand Down Expand Up @@ -551,7 +552,7 @@ func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespa
// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted
// pod status and also avoid block others CR handling
func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, r.client, &velerov2alpha1api.DataUploadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{})
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerDataUpload), r.client, &velerov2alpha1api.DataUploadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{})
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
du := object.(*velerov2alpha1api.DataUpload)
return (du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/download_request_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/itemoperationmap"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
Expand Down Expand Up @@ -218,7 +219,7 @@ func (r *downloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

func (r *downloadRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
downloadRequestSource := kube.NewPeriodicalEnqueueSource(r.log, mgr.GetClient(),
downloadRequestSource := kube.NewPeriodicalEnqueueSource(r.log.WithField("controller", constant.ControllerDownloadRequest), mgr.GetClient(),
&velerov1api.DownloadRequestList{}, defaultDownloadRequestSyncPeriod, kube.PeriodicalEnqueueSourceOption{})
downloadRequestPredicates := kube.NewGenericEventPredicate(func(object kbclient.Object) bool {
downloadRequest := object.(*velerov1api.DownloadRequest)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/gc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
veleroclient "github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
Expand Down Expand Up @@ -75,7 +76,7 @@ func NewGCReconciler(
// Other Events will be filtered to decrease the number of reconcile call. Especially UpdateEvent must be filtered since we removed
// the backup status as the sub-resource of backup in v1.9, every change on it will be treated as UpdateEvent and trigger reconcile call.
func (c *gcReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, c.frequency, kube.PeriodicalEnqueueSourceOption{})
s := kube.NewPeriodicalEnqueueSource(c.logger.WithField("controller", constant.ControllerGarbageCollection), mgr.GetClient(), &velerov1api.BackupList{}, c.frequency, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.Backup{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/restore_operations_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
"github.com/vmware-tanzu/velero/pkg/itemoperationmap"
"github.com/vmware-tanzu/velero/pkg/metrics"
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewRestoreOperationsReconciler(
}

func (r *restoreOperationsReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.RestoreList{}, r.frequency, kube.PeriodicalEnqueueSourceOption{})
s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerRestoreOperations), mgr.GetClient(), &velerov1api.RestoreList{}, r.frequency, kube.PeriodicalEnqueueSourceOption{})
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
restore := object.(*velerov1api.Restore)
return (restore.Status.Phase == velerov1api.RestorePhaseWaitingForPluginOperations ||
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/schedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
Expand Down Expand Up @@ -69,7 +70,7 @@ func NewScheduleReconciler(
}

func (c *scheduleReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod, kube.PeriodicalEnqueueSourceOption{})
s := kube.NewPeriodicalEnqueueSource(c.logger.WithField("controller", constant.ControllerSchedule), mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
// global predicate, works for both For and Watch
WithEventFilter(kube.NewAllEventPredicate(func(obj client.Object) bool {
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/kube/periodical_enqueue_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestStart(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.TODO())
client := (&fake.ClientBuilder{}).Build()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
source := NewPeriodicalEnqueueSource(logrus.WithContext(ctx), client, &velerov1.ScheduleList{}, 1*time.Second, PeriodicalEnqueueSourceOption{})
source := NewPeriodicalEnqueueSource(logrus.WithContext(ctx).WithField("controller", "PES_TEST"), client, &velerov1.ScheduleList{}, 1*time.Second, PeriodicalEnqueueSourceOption{})

require.NoError(t, source.Start(ctx, nil, queue))

Expand Down Expand Up @@ -75,7 +75,7 @@ func TestPredicate(t *testing.T) {
client := (&fake.ClientBuilder{}).Build()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
source := NewPeriodicalEnqueueSource(
logrus.WithContext(ctx),
logrus.WithContext(ctx).WithField("controller", "PES_TEST"),
client,
&velerov1.BackupStorageLocationList{},
1*time.Second,
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestOrder(t *testing.T) {
client := (&fake.ClientBuilder{}).Build()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
source := NewPeriodicalEnqueueSource(
logrus.WithContext(ctx),
logrus.WithContext(ctx).WithField("controller", "PES_TEST"),
client,
&velerov1.BackupStorageLocationList{},
1*time.Second,
Expand Down

0 comments on commit 5b4c8cd

Please sign in to comment.