Skip to content

Commit

Permalink
Fix metrics and add new metrics for timed out jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed May 30, 2024
1 parent 9739a94 commit 46b6967
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 63 deletions.
15 changes: 14 additions & 1 deletion pkg/controller/vitessbackupschedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,29 @@ const (
)

var (
backupScheduleLabels = []string{
metrics.BackupScheduleLabel,
metrics.ResultLabel,
}

reconcileCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metricsSubsystemName,
Name: "reconcile_count",
Help: "Reconciliation attempts for a VitessBackupSchedule",
}, []string{metrics.BackupStorageLabel, metrics.ResultLabel})
}, backupScheduleLabels)

timeoutJobsCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metricsSubsystemName,
Name: "timeout_jobs_removed_count",
Help: "Number of timed out jobs that were removed for a VitessBackupSchedule",
}, backupScheduleLabels)
)

func init() {
metrics.Registry.MustRegister(
reconcileCount,
timeoutJobsCount,
)
}
133 changes: 71 additions & 62 deletions pkg/controller/vitessbackupschedule/vitessbackupschedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
apilabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/record"
"planetscale.dev/vitess-operator/pkg/operator/metrics"
"planetscale.dev/vitess-operator/pkg/operator/reconciler"
"planetscale.dev/vitess-operator/pkg/operator/results"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -157,8 +158,9 @@ func (r *ReconcileVitessBackupsSchedule) Reconcile(ctx context.Context, req ctrl
})
log.Info("Reconciling VitessBackupSchedule")

var err error
var vbsc planetscalev2.VitessBackupSchedule
if err := r.client.Get(ctx, req.NamespacedName, &vbsc); err != nil {
if err = r.client.Get(ctx, req.NamespacedName, &vbsc); err != nil {
log.WithError(err).Error(" unable to fetch VitessBackupSchedule")
if apierrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
Expand All @@ -170,6 +172,11 @@ func (r *ReconcileVitessBackupsSchedule) Reconcile(ctx context.Context, req ctrl
return resultBuilder.Error(err)
}

// Register this reconciling attempt no matter if we fail or succeed.
defer func() {
reconcileCount.WithLabelValues(vbsc.Name, metrics.Result(err)).Inc()
}()

jobs, mostRecentTime, err := r.getJobsList(ctx, req, vbsc.Name)
if err != nil {
// We had an error reading the jobs, we can requeue.
Expand All @@ -187,7 +194,7 @@ func (r *ReconcileVitessBackupsSchedule) Reconcile(ctx context.Context, req ctrl
r.cleanupJobsWithLimit(ctx, jobs.failed, vbsc.GetFailedJobsLimit())
r.cleanupJobsWithLimit(ctx, jobs.successful, vbsc.GetSuccessfulJobsLimit())

err = r.removeTimeoutJobs(ctx, jobs.active, vbsc.Spec.JobTimeoutMinutes)
err = r.removeTimeoutJobs(ctx, jobs.active, vbsc.Name, vbsc.Spec.JobTimeoutMinutes)
if err != nil {
// We had an error while removing timed out jobs, we can requeue
return resultBuilder.Error(err)
Expand Down Expand Up @@ -232,7 +239,7 @@ func (r *ReconcileVitessBackupsSchedule) Reconcile(ctx context.Context, req ctrl
// Check concurrency policy to know if we should replace existing jobs
if vbsc.Spec.ConcurrencyPolicy == planetscalev2.ReplaceConcurrent {
for _, activeJob := range jobs.active {
if err := r.client.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
if err = r.client.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
log.WithError(err).Errorf("unable to delete active job: %s", activeJob.Name)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -394,7 +401,7 @@ func (r *ReconcileVitessBackupsSchedule) cleanupJobsWithLimit(ctx context.Contex
}
}

func (r *ReconcileVitessBackupsSchedule) removeTimeoutJobs(ctx context.Context, jobs []*kbatch.Job, timeout int32) error {
func (r *ReconcileVitessBackupsSchedule) removeTimeoutJobs(ctx context.Context, jobs []*kbatch.Job, vbscName string, timeout int32) error {
if timeout == -1 {
return nil
}
Expand All @@ -404,11 +411,12 @@ func (r *ReconcileVitessBackupsSchedule) removeTimeoutJobs(ctx context.Context,
return err
}
if jobStartTime.Add(time.Minute * time.Duration(timeout)).Before(time.Now()) {
if err := r.client.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
if err = r.client.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
log.WithError(err).Errorf("unable to delete timed out job: %s", job.Name)
} else {
log.Infof("deleted timed out job: %s", job.Name)
}
timeoutJobsCount.WithLabelValues(vbscName, metrics.Result(err)).Inc()
}
}
return nil
Expand Down Expand Up @@ -486,69 +494,70 @@ func (r *ReconcileVitessBackupsSchedule) createJob(ctx context.Context, vbsc *pl
}

func (r *ReconcileVitessBackupsSchedule) createJobPod(ctx context.Context, vbsc *planetscalev2.VitessBackupSchedule, name string) (pod corev1.PodSpec, err error) {
getVtctldServiceName := func(cluster string) (string, error) {
vtctldServiceName, vtctldServicePort, err := r.getVtctldServiceName(ctx, vbsc, cluster)
if err != nil {
return "", err
}
return fmt.Sprintf("--server=%s:%d", vtctldServiceName, vtctldServicePort), nil
}
// getVtctldServiceName := func(cluster string) (string, error) {
// vtctldServiceName, vtctldServicePort, err := r.getVtctldServiceName(ctx, vbsc, cluster)
// if err != nil {
// return "", err
// }
// return fmt.Sprintf("--server=%s:%d", vtctldServiceName, vtctldServicePort), nil
// }

// It is fine to not have any default in the event there is no strategy as the CRD validation
// ensures that there will be at least one item in this list. The YAML cannot be applied with
// empty list of strategies.
var cmd strings.Builder

addNewCmd := func(i int) {
if i > 0 {
cmd.WriteString(" && ")
}
}

for i, strategy := range vbsc.Spec.Strategy {
vtctldclientServerArg, err := getVtctldServiceName(vbsc.Spec.Cluster)
if err != nil {
return corev1.PodSpec{}, err
}

addNewCmd(i)
switch strategy.Name {
case planetscalev2.BackupShard:
if strategy.Keyspace == "" {
return pod, fmt.Errorf("the Keyspace field is missing from VitessBackupScheduleStrategy %s", planetscalev2.BackupShard)
}
if strategy.Shard == "" {
return pod, fmt.Errorf("the Shard field is missing from VitessBackupScheduleStrategy %s", planetscalev2.BackupShard)
}
createVtctldClientCommand(&cmd, vtctldclientServerArg, strategy.ExtraFlags, strategy.Keyspace, strategy.Shard)
case planetscalev2.BackupKeyspace:
if strategy.Keyspace == "" {
return pod, fmt.Errorf("the Keyspace field is missing from VitessBackupScheduleStrategy %s", planetscalev2.BackupKeyspace)
}
shards, err := r.getAllShardsInKeyspace(ctx, vbsc.Namespace, vbsc.Spec.Cluster, strategy.Keyspace)
if err != nil {
return corev1.PodSpec{}, err
}
for j, shard := range shards {
addNewCmd(j)
createVtctldClientCommand(&cmd, vtctldclientServerArg, strategy.ExtraFlags, strategy.Keyspace, shard)
}
case planetscalev2.BackupCluster:
keyspaces, err := r.getAllShardsInCluster(ctx, vbsc.Namespace, vbsc.Spec.Cluster)
if err != nil {
return corev1.PodSpec{}, err
}
for ksIndex, ks := range keyspaces {
for shardIndex, shard := range ks.shards {
if shardIndex > 0 || ksIndex > 0 {
cmd.WriteString(" && ")
}
createVtctldClientCommand(&cmd, vtctldclientServerArg, strategy.ExtraFlags, ks.name, shard)
}
}
}

}
cmd.WriteString("sleep 10000")
// addNewCmd := func(i int) {
// if i > 0 {
// cmd.WriteString(" && ")
// }
// }
//
// for i, strategy := range vbsc.Spec.Strategy {
// vtctldclientServerArg, err := getVtctldServiceName(vbsc.Spec.Cluster)
// if err != nil {
// return corev1.PodSpec{}, err
// }
//
// addNewCmd(i)
// switch strategy.Name {
// case planetscalev2.BackupShard:
// if strategy.Keyspace == "" {
// return pod, fmt.Errorf("the Keyspace field is missing from VitessBackupScheduleStrategy %s", planetscalev2.BackupShard)
// }
// if strategy.Shard == "" {
// return pod, fmt.Errorf("the Shard field is missing from VitessBackupScheduleStrategy %s", planetscalev2.BackupShard)
// }
// createVtctldClientCommand(&cmd, vtctldclientServerArg, strategy.ExtraFlags, strategy.Keyspace, strategy.Shard)
// case planetscalev2.BackupKeyspace:
// if strategy.Keyspace == "" {
// return pod, fmt.Errorf("the Keyspace field is missing from VitessBackupScheduleStrategy %s", planetscalev2.BackupKeyspace)
// }
// shards, err := r.getAllShardsInKeyspace(ctx, vbsc.Namespace, vbsc.Spec.Cluster, strategy.Keyspace)
// if err != nil {
// return corev1.PodSpec{}, err
// }
// for j, shard := range shards {
// addNewCmd(j)
// createVtctldClientCommand(&cmd, vtctldclientServerArg, strategy.ExtraFlags, strategy.Keyspace, shard)
// }
// case planetscalev2.BackupCluster:
// keyspaces, err := r.getAllShardsInCluster(ctx, vbsc.Namespace, vbsc.Spec.Cluster)
// if err != nil {
// return corev1.PodSpec{}, err
// }
// for ksIndex, ks := range keyspaces {
// for shardIndex, shard := range ks.shards {
// if shardIndex > 0 || ksIndex > 0 {
// cmd.WriteString(" && ")
// }
// createVtctldClientCommand(&cmd, vtctldclientServerArg, strategy.ExtraFlags, ks.name, shard)
// }
// }
// }
//
// }

pod = corev1.PodSpec{
Containers: []corev1.Container{{
Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
ShardLabel = "shard"
// BackupStorageLabel is the label whose value gives the name of a VitessBackupStorage object.
BackupStorageLabel = "backup_storage"
// BackupScheduleLabel is the label whose value gives the name of a VitessBackupSchedule object.
BackupScheduleLabel = "backup_schedule"

// ResultLabel is a common metrics label for the success/failure of an operation.
ResultLabel = "result"
Expand Down

0 comments on commit 46b6967

Please sign in to comment.