Skip to content

Commit

Permalink
Stop reconciliation and clustering (#578)
Browse files Browse the repository at this point in the history
* Stop reconciliation and clustering

Signed-off-by: d-kuro <[email protected]>
  • Loading branch information
d-kuro authored Dec 6, 2023
1 parent deb2272 commit cfdd1ff
Show file tree
Hide file tree
Showing 19 changed files with 954 additions and 8 deletions.
14 changes: 9 additions & 5 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,13 @@ type MySQLClusterStatus struct {
}

const (
ConditionInitialized string = "Initialized"
ConditionAvailable string = "Available"
ConditionHealthy string = "Healthy"
ConditionStatefulSetReady string = "StatefulSetReady"
ConditionReconcileSuccess string = "ReconcileSuccess"
ConditionInitialized string = "Initialized"
ConditionAvailable string = "Available"
ConditionHealthy string = "Healthy"
ConditionStatefulSetReady string = "StatefulSetReady"
ConditionReconcileSuccess string = "ReconcileSuccess"
ConditionReconciliationActive string = "ReconciliationActive"
ConditionClusteringActive string = "ClusteringActive"
)

// BackupStatus represents the status of the last successful backup.
Expand Down Expand Up @@ -669,6 +671,8 @@ type ReconcileInfo struct {
// +kubebuilder:printcolumn:name="Primary",type="integer",JSONPath=".status.currentPrimaryIndex"
// +kubebuilder:printcolumn:name="Synced replicas",type="integer",JSONPath=".status.syncedReplicas"
// +kubebuilder:printcolumn:name="Errant replicas",type="integer",JSONPath=".status.errantReplicas"
// +kubebuilder:printcolumn:name="Clustering Active",type="string",JSONPath=".status.conditions[?(@.type=='ClusteringActive')].status"
// +kubebuilder:printcolumn:name="Reconcile Active",type="string",JSONPath=".status.conditions[?(@.type=='ReconciliationActive')].status"
// +kubebuilder:printcolumn:name="Last backup",type="string",JSONPath=".status.backup.time"

// MySQLCluster is the Schema for the mysqlclusters API
Expand Down
6 changes: 6 additions & 0 deletions charts/moco/templates/generated/crds/moco_crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2099,6 +2099,12 @@ spec:
- jsonPath: .status.errantReplicas
name: Errant replicas
type: integer
- jsonPath: .status.conditions[?(@.type=='ClusteringActive')].status
name: Clustering Active
type: string
- jsonPath: .status.conditions[?(@.type=='ReconciliationActive')].status
name: Reconcile Active
type: string
- jsonPath: .status.backup.time
name: Last backup
type: string
Expand Down
15 changes: 15 additions & 0 deletions clustering/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ClusterManager interface {
UpdateNoStart(types.NamespacedName, string)
Stop(types.NamespacedName)
StopAll()
Pause(types.NamespacedName)
}

func NewClusterManager(interval time.Duration, m manager.Manager, opf dbop.OperatorFactory, af AgentFactory, log logr.Logger) ClusterManager {
Expand Down Expand Up @@ -122,3 +123,17 @@ func (m *clusterManager) StopAll() {
m.wg.Wait()
m.stopped = true
}

// Pause halts the manager process for the cluster.
// Unlike Stop, the metrics held by the manager are not deleted.
func (m *clusterManager) Pause(name types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

key := name.String()
p, ok := m.processes[key]
if ok {
p.Pause()
delete(m.processes, key)
}
}
30 changes: 30 additions & 0 deletions clustering/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clustering
import (
"context"
"fmt"
"math"
"time"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
Expand Down Expand Up @@ -49,10 +50,12 @@ type managerProcess struct {
agentf AgentFactory
name types.NamespacedName
cancel func()
pause bool

ch chan string
metrics metricsSet
deleteMetrics func()
pauseMetrics func()
}

func newManagerProcess(c client.Client, r client.Reader, recorder record.EventRecorder, dbf dbop.OperatorFactory, agentf AgentFactory, name types.NamespacedName, cancel func()) *managerProcess {
Expand Down Expand Up @@ -101,6 +104,12 @@ func newManagerProcess(c client.Client, r client.Reader, recorder record.EventRe
metrics.BackupWorkDirUsage.DeleteLabelValues(name.Name, name.Namespace)
metrics.BackupWarnings.DeleteLabelValues(name.Name, name.Namespace)
},
pauseMetrics: func() {
metrics.AvailableVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN())
metrics.HealthyVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN())
metrics.ReadyReplicasVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN())
metrics.ErrantReplicasVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN())
},
}
}

Expand All @@ -115,10 +124,22 @@ func (p *managerProcess) Cancel() {
p.cancel()
}

// Pause pauses the manager process.
// Unlike Cancel, it does not delete metrics.
// Also, it sets NaN to some metrics.
func (p *managerProcess) Pause() {
p.pause = true
p.cancel()
}

func (p *managerProcess) Start(ctx context.Context, rootLog logr.Logger, interval time.Duration) {
tick := time.NewTicker(interval)
defer func() {
tick.Stop()
if p.pause {
p.pauseMetrics()
return
}
p.deleteMetrics()
}()

Expand Down Expand Up @@ -266,6 +287,15 @@ func (p *managerProcess) updateStatus(ctx context.Context, ss *StatusSet) error
meta.SetStatusCondition(&cluster.Status.Conditions, updateCond(mocov1beta2.ConditionAvailable, available))
meta.SetStatusCondition(&cluster.Status.Conditions, updateCond(mocov1beta2.ConditionHealthy, healthy))

meta.SetStatusCondition(&cluster.Status.Conditions,
metav1.Condition{
Type: mocov1beta2.ConditionClusteringActive,
Status: metav1.ConditionTrue,
Reason: "ClusteringActive",
Message: "clustering is active",
},
)

if available == metav1.ConditionTrue {
p.metrics.available.Set(1)
} else {
Expand Down
103 changes: 103 additions & 0 deletions cmd/kubectl-moco/cmd/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package cmd

import (
"context"
"fmt"
"os"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
"github.com/cybozu-go/moco/pkg/constants"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
)

func init() {
rootCmd.AddCommand(startCmd)
startCmd.AddCommand(startClusteringCmd)
startCmd.AddCommand(startReconciliationCmd)
}

var startCmd = &cobra.Command{
Use: "start",
Short: "Starts the MySQLCluster reconciliation or clustering",
Long: "The start command is used to start the reconciliation or clustering of MySQLCluster",
}

var startClusteringCmd = &cobra.Command{
Use: "clustering CLUSTER_NAME",
Short: "Start the specified MySQLCluster's clustering",
Long: "start clustering is a command to start the clustering of the specified MySQLCluster. It requires the cluster name as the parameter.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return startClustering(cmd.Context(), args[0])
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete)
},
}

func startClustering(ctx context.Context, name string) error {
cluster := &mocov1beta2.MySQLCluster{}
if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil {
return err
}

orig := cluster.DeepCopy()

if ann, ok := cluster.Annotations[constants.AnnClusteringStopped]; ok && ann == "true" {
delete(cluster.Annotations, constants.AnnClusteringStopped)
}

if equality.Semantic.DeepEqual(orig, cluster) {
fmt.Fprintf(os.Stdout, "The clustering is already running.")
return nil
}

if err := kubeClient.Update(ctx, cluster); err != nil {
return fmt.Errorf("failed to start clustering of MySQLCluster: %w", err)
}

fmt.Fprintf(os.Stdout, "started clustering of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name))

return nil
}

var startReconciliationCmd = &cobra.Command{
Use: "reconciliation CLUSTER_NAME",
Short: "Start the specified MySQLCluster's reconciliation",
Long: "start reconciliation is a command to start the reconciliation process for the specified MySQLCluster. This requires the cluster name as the parameter.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return startReconciliation(cmd.Context(), args[0])
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete)
},
}

func startReconciliation(ctx context.Context, name string) error {
cluster := &mocov1beta2.MySQLCluster{}
if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil {
return err
}

orig := cluster.DeepCopy()

if ann, ok := cluster.Annotations[constants.AnnReconciliationStopped]; ok && ann == "true" {
delete(cluster.Annotations, constants.AnnReconciliationStopped)
}

if equality.Semantic.DeepEqual(orig, cluster) {
fmt.Fprintf(os.Stdout, "The reconciliation is already running.")
return nil
}

if err := kubeClient.Update(ctx, cluster); err != nil {
return fmt.Errorf("failed to start reconciliation of MySQLCluster: %w", err)
}

fmt.Fprintf(os.Stdout, "started reconciliation of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name))

return nil
}
105 changes: 105 additions & 0 deletions cmd/kubectl-moco/cmd/stop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package cmd

import (
"context"
"fmt"
"os"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
"github.com/cybozu-go/moco/pkg/constants"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
)

func init() {
rootCmd.AddCommand(stopCmd)
stopCmd.AddCommand(stopClusteringCmd)
stopCmd.AddCommand(stopReconciliationCmd)
}

var stopCmd = &cobra.Command{
Use: "stop",
Short: "Stops the MySQLCluster reconciliation or clustering",
Long: "The stop command is used to halt the reconciliation or clustering of MySQLCluster",
}

var stopClusteringCmd = &cobra.Command{
Use: "clustering CLUSTER_NAME",
Short: "Stop the specified MySQLCluster's clustering",
Long: "stop clustering is a command to stop the clustering of the specified MySQLCluster. It requires the cluster name as the parameter.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return stopClustering(cmd.Context(), args[0])
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete)
},
}

func stopClustering(ctx context.Context, name string) error {
cluster := &mocov1beta2.MySQLCluster{}
if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil {
return err
}

orig := cluster.DeepCopy()

if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
cluster.Annotations[constants.AnnClusteringStopped] = "true"

if equality.Semantic.DeepEqual(orig, cluster) {
fmt.Fprintf(os.Stdout, "The clustering is already stopped.")
return nil
}

if err := kubeClient.Update(ctx, cluster); err != nil {
return fmt.Errorf("failed to stop clustering of MySQLCluster: %w", err)
}

fmt.Fprintf(os.Stdout, "stopped clustering of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name))

return nil
}

var stopReconciliationCmd = &cobra.Command{
Use: "reconciliation CLUSTER_NAME",
Short: "Stop the specified MySQLCluster's reconciliation",
Long: "stop reconciliation is a command to stop the reconciliation process for the specified MySQLCluster. This requires the cluster name as the parameter.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return stopReconciliation(cmd.Context(), args[0])
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete)
},
}

func stopReconciliation(ctx context.Context, name string) error {
cluster := &mocov1beta2.MySQLCluster{}
if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil {
return err
}

orig := cluster.DeepCopy()

if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
cluster.Annotations[constants.AnnReconciliationStopped] = "true"

if equality.Semantic.DeepEqual(orig, cluster) {
fmt.Fprintf(os.Stdout, "The reconciliation is already stopped.")
return nil
}

if err := kubeClient.Update(ctx, cluster); err != nil {
return fmt.Errorf("failed to stop reconciliation of MySQLCluster: %w", err)
}

fmt.Fprintf(os.Stdout, "stopped reconciliation of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name))

return nil
}
6 changes: 6 additions & 0 deletions config/crd/bases/moco.cybozu.com_mysqlclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ spec:
- jsonPath: .status.errantReplicas
name: Errant replicas
type: integer
- jsonPath: .status.conditions[?(@.type=='ClusteringActive')].status
name: Clustering Active
type: string
- jsonPath: .status.conditions[?(@.type=='ReconciliationActive')].status
name: Reconcile Active
type: string
- jsonPath: .status.backup.time
name: Last backup
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ spec:
- jsonPath: .status.errantReplicas
name: Errant replicas
type: integer
- jsonPath: .status.conditions[?(@.type=='ClusteringActive')].status
name: Clustering Active
type: string
- jsonPath: .status.conditions[?(@.type=='ReconciliationActive')].status
name: Reconcile Active
type: string
- jsonPath: .status.backup.time
name: Last backup
type: string
Expand Down
7 changes: 7 additions & 0 deletions controllers/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ func (m *mockManager) Stop(key types.NamespacedName) {

func (m *mockManager) StopAll() {}

func (m *mockManager) Pause(key types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

delete(m.clusters, key.String())
}

func (m *mockManager) getKeys() map[string]bool {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
Loading

0 comments on commit cfdd1ff

Please sign in to comment.