Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop reconciliation and clustering #578

Merged
merged 16 commits into from
Dec 6, 2023
14 changes: 9 additions & 5 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,13 @@ type MySQLClusterStatus struct {
}

const (
ConditionInitialized string = "Initialized"
ConditionAvailable string = "Available"
ConditionHealthy string = "Healthy"
ConditionStatefulSetReady string = "StatefulSetReady"
ConditionReconcileSuccess string = "ReconcileSuccess"
ConditionInitialized string = "Initialized"
ConditionAvailable string = "Available"
ConditionHealthy string = "Healthy"
ConditionStatefulSetReady string = "StatefulSetReady"
ConditionReconcileSuccess string = "ReconcileSuccess"
ConditionReconciliationActive string = "ReconciliationActive"
ConditionClusteringActive string = "ClusteringActive"
)

// BackupStatus represents the status of the last successful backup.
Expand Down Expand Up @@ -665,6 +667,8 @@ type ReconcileInfo struct {
// +kubebuilder:printcolumn:name="Primary",type="integer",JSONPath=".status.currentPrimaryIndex"
// +kubebuilder:printcolumn:name="Synced replicas",type="integer",JSONPath=".status.syncedReplicas"
// +kubebuilder:printcolumn:name="Errant replicas",type="integer",JSONPath=".status.errantReplicas"
// +kubebuilder:printcolumn:name="Clustering Active",type="string",JSONPath=".status.conditions[?(@.type=='ClusteringActive')].status"
// +kubebuilder:printcolumn:name="Reconcile Active",type="string",JSONPath=".status.conditions[?(@.type=='ReconciliationActive')].status"
// +kubebuilder:printcolumn:name="Last backup",type="string",JSONPath=".status.backup.time"

// MySQLCluster is the Schema for the mysqlclusters API
Expand Down
6 changes: 6 additions & 0 deletions charts/moco/templates/generated/crds/moco_crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2099,6 +2099,12 @@ spec:
- jsonPath: .status.errantReplicas
name: Errant replicas
type: integer
- jsonPath: .status.conditions[?(@.type=='ClusteringActive')].status
name: Clustering Active
type: string
- jsonPath: .status.conditions[?(@.type=='ReconciliationActive')].status
name: Reconcile Active
type: string
- jsonPath: .status.backup.time
name: Last backup
type: string
Expand Down
15 changes: 15 additions & 0 deletions clustering/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ClusterManager interface {
UpdateNoStart(types.NamespacedName, string)
Stop(types.NamespacedName)
StopAll()
Pause(types.NamespacedName)
}

func NewClusterManager(interval time.Duration, m manager.Manager, opf dbop.OperatorFactory, af AgentFactory, log logr.Logger) ClusterManager {
Expand Down Expand Up @@ -122,3 +123,17 @@ func (m *clusterManager) StopAll() {
m.wg.Wait()
m.stopped = true
}

// Pause halts the manager process for the cluster.
// Unlike Stop, the metrics held by the manager are not deleted.
func (m *clusterManager) Pause(name types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

key := name.String()
p, ok := m.processes[key]
if ok {
p.Pause()
delete(m.processes, key)
}
}
30 changes: 30 additions & 0 deletions clustering/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clustering
import (
"context"
"fmt"
"math"
"time"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
Expand Down Expand Up @@ -49,10 +50,12 @@ type managerProcess struct {
agentf AgentFactory
name types.NamespacedName
cancel func()
pause bool

ch chan string
metrics metricsSet
deleteMetrics func()
pauseMetrics func()
}

func newManagerProcess(c client.Client, r client.Reader, recorder record.EventRecorder, dbf dbop.OperatorFactory, agentf AgentFactory, name types.NamespacedName, cancel func()) *managerProcess {
Expand Down Expand Up @@ -101,6 +104,12 @@ func newManagerProcess(c client.Client, r client.Reader, recorder record.EventRe
metrics.BackupWorkDirUsage.DeleteLabelValues(name.Name, name.Namespace)
metrics.BackupWarnings.DeleteLabelValues(name.Name, name.Namespace)
},
pauseMetrics: func() {
metrics.AvailableVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN())
metrics.HealthyVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN())
metrics.ReadyReplicasVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN())
metrics.ErrantReplicasVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN())
},
}
}

Expand All @@ -115,10 +124,22 @@ func (p *managerProcess) Cancel() {
p.cancel()
}

// Pause pauses the manager process.
// Unlike Cancel, it does not delete metrics.
// Also, it sets NaN to some metrics.
func (p *managerProcess) Pause() {
p.pause = true
p.cancel()
}

func (p *managerProcess) Start(ctx context.Context, rootLog logr.Logger, interval time.Duration) {
tick := time.NewTicker(interval)
defer func() {
tick.Stop()
if p.pause {
p.pauseMetrics()
return
}
p.deleteMetrics()
}()

Expand Down Expand Up @@ -266,6 +287,15 @@ func (p *managerProcess) updateStatus(ctx context.Context, ss *StatusSet) error
meta.SetStatusCondition(&cluster.Status.Conditions, updateCond(mocov1beta2.ConditionAvailable, available))
meta.SetStatusCondition(&cluster.Status.Conditions, updateCond(mocov1beta2.ConditionHealthy, healthy))

meta.SetStatusCondition(&cluster.Status.Conditions,
metav1.Condition{
Type: mocov1beta2.ConditionClusteringActive,
Status: metav1.ConditionTrue,
Reason: "ClusteringActive",
Message: "clustering is active",
},
)

if available == metav1.ConditionTrue {
p.metrics.available.Set(1)
} else {
Expand Down
103 changes: 103 additions & 0 deletions cmd/kubectl-moco/cmd/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package cmd

import (
"context"
"fmt"
"os"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
"github.com/cybozu-go/moco/pkg/constants"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
)

func init() {
rootCmd.AddCommand(startCmd)
startCmd.AddCommand(startClusteringCmd)
startCmd.AddCommand(startReconciliationCmd)
}

var startCmd = &cobra.Command{
Use: "start",
Short: "Starts the MySQLCluster reconciliation or clustering",
Long: "The start command is used to start the reconciliation or clustering of MySQLCluster",
}

var startClusteringCmd = &cobra.Command{
Use: "clustering CLUSTER_NAME",
Short: "Start the specified MySQLCluster's clustering",
Long: "start clustering is a command to start the clustering of the specified MySQLCluster. It requires the cluster name as the parameter.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return startClustering(cmd.Context(), args[0])
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete)
},
}

func startClustering(ctx context.Context, name string) error {
cluster := &mocov1beta2.MySQLCluster{}
if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil {
return err
}

orig := cluster.DeepCopy()

if ann, ok := cluster.Annotations[constants.AnnClusteringStopped]; ok && ann == "true" {
delete(cluster.Annotations, constants.AnnClusteringStopped)
}

if equality.Semantic.DeepEqual(orig, cluster) {
fmt.Fprintf(os.Stdout, "The clustering is already running.")
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want a message like "The clustering is already running."

}

if err := kubeClient.Update(ctx, cluster); err != nil {
return fmt.Errorf("failed to start clustering of MySQLCluster: %w", err)
}

fmt.Fprintf(os.Stdout, "started clustering of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name))

return nil
}

var startReconciliationCmd = &cobra.Command{
Use: "reconciliation CLUSTER_NAME",
Short: "Start the specified MySQLCluster's reconciliation",
Long: "start reconciliation is a command to start the reconciliation process for the specified MySQLCluster. This requires the cluster name as the parameter.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return startReconciliation(cmd.Context(), args[0])
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete)
},
}

func startReconciliation(ctx context.Context, name string) error {
cluster := &mocov1beta2.MySQLCluster{}
if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil {
return err
}

orig := cluster.DeepCopy()

if ann, ok := cluster.Annotations[constants.AnnReconciliationStopped]; ok && ann == "true" {
delete(cluster.Annotations, constants.AnnReconciliationStopped)
}

if equality.Semantic.DeepEqual(orig, cluster) {
fmt.Fprintf(os.Stdout, "The reconciliation is already running.")
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.
I want a message like "The reconciliation is already running."

}

if err := kubeClient.Update(ctx, cluster); err != nil {
return fmt.Errorf("failed to start reconciliation of MySQLCluster: %w", err)
}

fmt.Fprintf(os.Stdout, "started reconciliation of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name))

return nil
}
105 changes: 105 additions & 0 deletions cmd/kubectl-moco/cmd/stop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package cmd

import (
"context"
"fmt"
"os"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
"github.com/cybozu-go/moco/pkg/constants"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
)

func init() {
rootCmd.AddCommand(stopCmd)
stopCmd.AddCommand(stopClusteringCmd)
stopCmd.AddCommand(stopReconciliationCmd)
}

var stopCmd = &cobra.Command{
Use: "stop",
Short: "Stops the MySQLCluster reconciliation or clustering",
Long: "The stop command is used to halt the reconciliation or clustering of MySQLCluster",
}

var stopClusteringCmd = &cobra.Command{
Use: "clustering CLUSTER_NAME",
Short: "Stop the specified MySQLCluster's clustering",
Long: "stop clustering is a command to stop the clustering of the specified MySQLCluster. It requires the cluster name as the parameter.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return stopClustering(cmd.Context(), args[0])
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete)
},
}

func stopClustering(ctx context.Context, name string) error {
cluster := &mocov1beta2.MySQLCluster{}
if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil {
return err
}

orig := cluster.DeepCopy()

if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
cluster.Annotations[constants.AnnClusteringStopped] = "true"

if equality.Semantic.DeepEqual(orig, cluster) {
fmt.Fprintf(os.Stdout, "The clustering is already stopped.")
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

if err := kubeClient.Update(ctx, cluster); err != nil {
return fmt.Errorf("failed to stop clustering of MySQLCluster: %w", err)
}

fmt.Fprintf(os.Stdout, "stopped clustering of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name))

return nil
}

var stopReconciliationCmd = &cobra.Command{
Use: "reconciliation CLUSTER_NAME",
Short: "Stop the specified MySQLCluster's reconciliation",
Long: "stop reconciliation is a command to stop the reconciliation process for the specified MySQLCluster. This requires the cluster name as the parameter.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return stopReconciliation(cmd.Context(), args[0])
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete)
},
}

func stopReconciliation(ctx context.Context, name string) error {
cluster := &mocov1beta2.MySQLCluster{}
if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil {
return err
}

orig := cluster.DeepCopy()

if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
cluster.Annotations[constants.AnnReconciliationStopped] = "true"

if equality.Semantic.DeepEqual(orig, cluster) {
fmt.Fprintf(os.Stdout, "The reconciliation is already stopped.")
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

if err := kubeClient.Update(ctx, cluster); err != nil {
return fmt.Errorf("failed to stop reconciliation of MySQLCluster: %w", err)
}

fmt.Fprintf(os.Stdout, "stopped reconciliation of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name))

return nil
}
6 changes: 6 additions & 0 deletions config/crd/bases/moco.cybozu.com_mysqlclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ spec:
- jsonPath: .status.errantReplicas
name: Errant replicas
type: integer
- jsonPath: .status.conditions[?(@.type=='ClusteringActive')].status
name: Clustering Active
type: string
- jsonPath: .status.conditions[?(@.type=='ReconciliationActive')].status
name: Reconcile Active
type: string
- jsonPath: .status.backup.time
name: Last backup
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ spec:
- jsonPath: .status.errantReplicas
name: Errant replicas
type: integer
- jsonPath: .status.conditions[?(@.type=='ClusteringActive')].status
name: Clustering Active
type: string
- jsonPath: .status.conditions[?(@.type=='ReconciliationActive')].status
name: Reconcile Active
type: string
- jsonPath: .status.backup.time
name: Last backup
type: string
Expand Down
7 changes: 7 additions & 0 deletions controllers/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ func (m *mockManager) Stop(key types.NamespacedName) {

func (m *mockManager) StopAll() {}

func (m *mockManager) Pause(key types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

delete(m.clusters, key.String())
}

func (m *mockManager) getKeys() map[string]bool {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
Loading