diff --git a/api/v1beta2/mysqlcluster_types.go b/api/v1beta2/mysqlcluster_types.go index e5ca500de..c054aa028 100644 --- a/api/v1beta2/mysqlcluster_types.go +++ b/api/v1beta2/mysqlcluster_types.go @@ -602,11 +602,13 @@ type MySQLClusterStatus struct { } const ( - ConditionInitialized string = "Initialized" - ConditionAvailable string = "Available" - ConditionHealthy string = "Healthy" - ConditionStatefulSetReady string = "StatefulSetReady" - ConditionReconcileSuccess string = "ReconcileSuccess" + ConditionInitialized string = "Initialized" + ConditionAvailable string = "Available" + ConditionHealthy string = "Healthy" + ConditionStatefulSetReady string = "StatefulSetReady" + ConditionReconcileSuccess string = "ReconcileSuccess" + ConditionReconciliationActive string = "ReconciliationActive" + ConditionClusteringActive string = "ClusteringActive" ) // BackupStatus represents the status of the last successful backup. @@ -669,6 +671,8 @@ type ReconcileInfo struct { // +kubebuilder:printcolumn:name="Primary",type="integer",JSONPath=".status.currentPrimaryIndex" // +kubebuilder:printcolumn:name="Synced replicas",type="integer",JSONPath=".status.syncedReplicas" // +kubebuilder:printcolumn:name="Errant replicas",type="integer",JSONPath=".status.errantReplicas" +// +kubebuilder:printcolumn:name="Clustering Active",type="string",JSONPath=".status.conditions[?(@.type=='ClusteringActive')].status" +// +kubebuilder:printcolumn:name="Reconcile Active",type="string",JSONPath=".status.conditions[?(@.type=='ReconciliationActive')].status" // +kubebuilder:printcolumn:name="Last backup",type="string",JSONPath=".status.backup.time" // MySQLCluster is the Schema for the mysqlclusters API diff --git a/charts/moco/templates/generated/crds/moco_crds.yaml b/charts/moco/templates/generated/crds/moco_crds.yaml index b2c504310..5c5d9620c 100644 --- a/charts/moco/templates/generated/crds/moco_crds.yaml +++ b/charts/moco/templates/generated/crds/moco_crds.yaml @@ -2099,6 +2099,12 @@ spec: - jsonPath: .status.errantReplicas name: Errant replicas type: integer + - jsonPath: .status.conditions[?(@.type=='ClusteringActive')].status + name: Clustering Active + type: string + - jsonPath: .status.conditions[?(@.type=='ReconciliationActive')].status + name: Reconcile Active + type: string - jsonPath: .status.backup.time name: Last backup type: string diff --git a/clustering/manager.go b/clustering/manager.go index 7607daa06..26dd0cc42 100644 --- a/clustering/manager.go +++ b/clustering/manager.go @@ -27,6 +27,7 @@ type ClusterManager interface { UpdateNoStart(types.NamespacedName, string) Stop(types.NamespacedName) StopAll() + Pause(types.NamespacedName) } func NewClusterManager(interval time.Duration, m manager.Manager, opf dbop.OperatorFactory, af AgentFactory, log logr.Logger) ClusterManager { @@ -122,3 +123,17 @@ func (m *clusterManager) StopAll() { m.wg.Wait() m.stopped = true } + +// Pause halts the manager process for the cluster. +// Unlike Stop, the metrics held by the manager are not deleted. +func (m *clusterManager) Pause(name types.NamespacedName) { + m.mu.Lock() + defer m.mu.Unlock() + + key := name.String() + p, ok := m.processes[key] + if ok { + p.Pause() + delete(m.processes, key) + } +} diff --git a/clustering/process.go b/clustering/process.go index baa0533d3..c965791ac 100644 --- a/clustering/process.go +++ b/clustering/process.go @@ -3,6 +3,7 @@ package clustering import ( "context" "fmt" + "math" "time" mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" @@ -49,10 +50,12 @@ type managerProcess struct { agentf AgentFactory name types.NamespacedName cancel func() + pause bool ch chan string metrics metricsSet deleteMetrics func() + pauseMetrics func() } func newManagerProcess(c client.Client, r client.Reader, recorder record.EventRecorder, dbf dbop.OperatorFactory, agentf AgentFactory, name types.NamespacedName, cancel func()) *managerProcess { @@ -101,6 +104,12 @@ func newManagerProcess(c client.Client, r client.Reader, recorder record.EventRe metrics.BackupWorkDirUsage.DeleteLabelValues(name.Name, name.Namespace) metrics.BackupWarnings.DeleteLabelValues(name.Name, name.Namespace) }, + pauseMetrics: func() { + metrics.AvailableVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN()) + metrics.HealthyVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN()) + metrics.ReadyReplicasVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN()) + metrics.ErrantReplicasVec.WithLabelValues(name.Name, name.Namespace).Set(math.NaN()) + }, } } @@ -115,10 +124,22 @@ func (p *managerProcess) Cancel() { p.cancel() } +// Pause pauses the manager process. +// Unlike Cancel, it does not delete metrics. +// Also, it sets NaN to some metrics. +func (p *managerProcess) Pause() { + p.pause = true + p.cancel() +} + func (p *managerProcess) Start(ctx context.Context, rootLog logr.Logger, interval time.Duration) { tick := time.NewTicker(interval) defer func() { tick.Stop() + if p.pause { + p.pauseMetrics() + return + } p.deleteMetrics() }() @@ -266,6 +287,15 @@ func (p *managerProcess) updateStatus(ctx context.Context, ss *StatusSet) error meta.SetStatusCondition(&cluster.Status.Conditions, updateCond(mocov1beta2.ConditionAvailable, available)) meta.SetStatusCondition(&cluster.Status.Conditions, updateCond(mocov1beta2.ConditionHealthy, healthy)) + meta.SetStatusCondition(&cluster.Status.Conditions, + metav1.Condition{ + Type: mocov1beta2.ConditionClusteringActive, + Status: metav1.ConditionTrue, + Reason: "ClusteringActive", + Message: "clustering is active", + }, + ) + if available == metav1.ConditionTrue { p.metrics.available.Set(1) } else { diff --git a/cmd/kubectl-moco/cmd/start.go b/cmd/kubectl-moco/cmd/start.go new file mode 100644 index 000000000..ddbf9a817 --- /dev/null +++ b/cmd/kubectl-moco/cmd/start.go @@ -0,0 +1,103 @@ +package cmd + +import ( + "context" + "fmt" + "os" + + mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" + "github.com/cybozu-go/moco/pkg/constants" + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/types" +) + +func init() { + rootCmd.AddCommand(startCmd) + startCmd.AddCommand(startClusteringCmd) + startCmd.AddCommand(startReconciliationCmd) +} + +var startCmd = &cobra.Command{ + Use: "start", + Short: "Starts the MySQLCluster reconciliation or clustering", + Long: "The start command is used to start the reconciliation or clustering of MySQLCluster", +} + +var startClusteringCmd = &cobra.Command{ + Use: "clustering CLUSTER_NAME", + Short: "Start the specified MySQLCluster's clustering", + Long: "start clustering is a command to start the clustering of the specified MySQLCluster. It requires the cluster name as the parameter.", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return startClustering(cmd.Context(), args[0]) + }, + ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete) + }, +} + +func startClustering(ctx context.Context, name string) error { + cluster := &mocov1beta2.MySQLCluster{} + if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil { + return err + } + + orig := cluster.DeepCopy() + + if ann, ok := cluster.Annotations[constants.AnnClusteringStopped]; ok && ann == "true" { + delete(cluster.Annotations, constants.AnnClusteringStopped) + } + + if equality.Semantic.DeepEqual(orig, cluster) { + fmt.Fprintf(os.Stdout, "The clustering is already running.") + return nil + } + + if err := kubeClient.Update(ctx, cluster); err != nil { + return fmt.Errorf("failed to start clustering of MySQLCluster: %w", err) + } + + fmt.Fprintf(os.Stdout, "started clustering of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name)) + + return nil +} + +var startReconciliationCmd = &cobra.Command{ + Use: "reconciliation CLUSTER_NAME", + Short: "Start the specified MySQLCluster's reconciliation", + Long: "start reconciliation is a command to start the reconciliation process for the specified MySQLCluster. This requires the cluster name as the parameter.", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return startReconciliation(cmd.Context(), args[0]) + }, + ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete) + }, +} + +func startReconciliation(ctx context.Context, name string) error { + cluster := &mocov1beta2.MySQLCluster{} + if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil { + return err + } + + orig := cluster.DeepCopy() + + if ann, ok := cluster.Annotations[constants.AnnReconciliationStopped]; ok && ann == "true" { + delete(cluster.Annotations, constants.AnnReconciliationStopped) + } + + if equality.Semantic.DeepEqual(orig, cluster) { + fmt.Fprintf(os.Stdout, "The reconciliation is already running.") + return nil + } + + if err := kubeClient.Update(ctx, cluster); err != nil { + return fmt.Errorf("failed to start reconciliation of MySQLCluster: %w", err) + } + + fmt.Fprintf(os.Stdout, "started reconciliation of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name)) + + return nil +} diff --git a/cmd/kubectl-moco/cmd/stop.go b/cmd/kubectl-moco/cmd/stop.go new file mode 100644 index 000000000..a1d2796c3 --- /dev/null +++ b/cmd/kubectl-moco/cmd/stop.go @@ -0,0 +1,105 @@ +package cmd + +import ( + "context" + "fmt" + "os" + + mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" + "github.com/cybozu-go/moco/pkg/constants" + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/types" +) + +func init() { + rootCmd.AddCommand(stopCmd) + stopCmd.AddCommand(stopClusteringCmd) + stopCmd.AddCommand(stopReconciliationCmd) +} + +var stopCmd = &cobra.Command{ + Use: "stop", + Short: "Stops the MySQLCluster reconciliation or clustering", + Long: "The stop command is used to halt the reconciliation or clustering of MySQLCluster", +} + +var stopClusteringCmd = &cobra.Command{ + Use: "clustering CLUSTER_NAME", + Short: "Stop the specified MySQLCluster's clustering", + Long: "stop clustering is a command to stop the clustering of the specified MySQLCluster. It requires the cluster name as the parameter.", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return stopClustering(cmd.Context(), args[0]) + }, + ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete) + }, +} + +func stopClustering(ctx context.Context, name string) error { + cluster := &mocov1beta2.MySQLCluster{} + if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil { + return err + } + + orig := cluster.DeepCopy() + + if cluster.Annotations == nil { + cluster.Annotations = make(map[string]string) + } + cluster.Annotations[constants.AnnClusteringStopped] = "true" + + if equality.Semantic.DeepEqual(orig, cluster) { + fmt.Fprintf(os.Stdout, "The clustering is already stopped.") + return nil + } + + if err := kubeClient.Update(ctx, cluster); err != nil { + return fmt.Errorf("failed to stop clustering of MySQLCluster: %w", err) + } + + fmt.Fprintf(os.Stdout, "stopped clustering of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name)) + + return nil +} + +var stopReconciliationCmd = &cobra.Command{ + Use: "reconciliation CLUSTER_NAME", + Short: "Stop the specified MySQLCluster's reconciliation", + Long: "stop reconciliation is a command to stop the reconciliation process for the specified MySQLCluster. This requires the cluster name as the parameter.", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return stopReconciliation(cmd.Context(), args[0]) + }, + ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return mysqlClusterCandidates(cmd.Context(), cmd, args, toComplete) + }, +} + +func stopReconciliation(ctx context.Context, name string) error { + cluster := &mocov1beta2.MySQLCluster{} + if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, cluster); err != nil { + return err + } + + orig := cluster.DeepCopy() + + if cluster.Annotations == nil { + cluster.Annotations = make(map[string]string) + } + cluster.Annotations[constants.AnnReconciliationStopped] = "true" + + if equality.Semantic.DeepEqual(orig, cluster) { + fmt.Fprintf(os.Stdout, "The reconciliation is already stopped.") + return nil + } + + if err := kubeClient.Update(ctx, cluster); err != nil { + return fmt.Errorf("failed to stop reconciliation of MySQLCluster: %w", err) + } + + fmt.Fprintf(os.Stdout, "stopped reconciliation of MySQLCluster %q\n", fmt.Sprintf("%s/%s", namespace, name)) + + return nil +} diff --git a/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml b/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml index 404e62087..9fc7161d6 100644 --- a/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml +++ b/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml @@ -30,6 +30,12 @@ spec: - jsonPath: .status.errantReplicas name: Errant replicas type: integer + - jsonPath: .status.conditions[?(@.type=='ClusteringActive')].status + name: Clustering Active + type: string + - jsonPath: .status.conditions[?(@.type=='ReconciliationActive')].status + name: Reconcile Active + type: string - jsonPath: .status.backup.time name: Last backup type: string diff --git a/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml b/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml index cb526ca07..115a1db98 100644 --- a/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml +++ b/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml @@ -30,6 +30,12 @@ spec: - jsonPath: .status.errantReplicas name: Errant replicas type: integer + - jsonPath: .status.conditions[?(@.type=='ClusteringActive')].status + name: Clustering Active + type: string + - jsonPath: .status.conditions[?(@.type=='ReconciliationActive')].status + name: Reconcile Active + type: string - jsonPath: .status.backup.time name: Last backup type: string diff --git a/controllers/mock_test.go b/controllers/mock_test.go index ae40d2229..2054e3c0b 100644 --- a/controllers/mock_test.go +++ b/controllers/mock_test.go @@ -38,6 +38,13 @@ func (m *mockManager) Stop(key types.NamespacedName) { func (m *mockManager) StopAll() {} +func (m *mockManager) Pause(key types.NamespacedName) { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.clusters, key.String()) +} + func (m *mockManager) getKeys() map[string]bool { m.mu.Lock() defer m.mu.Unlock() diff --git a/controllers/mysqlcluster_controller.go b/controllers/mysqlcluster_controller.go index 58e27d248..95c97fbf1 100644 --- a/controllers/mysqlcluster_controller.go +++ b/controllers/mysqlcluster_controller.go @@ -232,6 +232,24 @@ func (r *MySQLClusterReconciler) reconcileV1(ctx context.Context, req ctrl.Reque return ctrl.Result{}, nil } + if isReconciliationStopped(cluster) { + log.Info("reconciliation is stopped") + + if isClusteringStopped(cluster) { + if err := r.clusteringStopV1(ctx, cluster); err != nil { + return ctrl.Result{}, err + } + } else { + r.ClusterManager.Update(client.ObjectKeyFromObject(cluster), string(controller.ReconcileIDFromContext(ctx))) + metrics.ClusteringStoppedVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(0) + } + + if err := r.reconciliationStopV1(ctx, cluster); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + defer func() { if err2 := r.updateStatus(ctx, cluster, err); err2 != nil { err = err2 @@ -294,7 +312,15 @@ func (r *MySQLClusterReconciler) reconcileV1(ctx context.Context, req ctrl.Reque return ctrl.Result{}, err } + if isClusteringStopped(cluster) { + if err := r.clusteringStopV1(ctx, cluster); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + r.ClusterManager.Update(client.ObjectKeyFromObject(cluster), string(controller.ReconcileIDFromContext(ctx))) + metrics.ClusteringStoppedVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(0) return ctrl.Result{}, nil } @@ -1634,6 +1660,9 @@ func (r *MySQLClusterReconciler) finalizeV1(ctx context.Context, cluster *mocov1 return fmt.Errorf("failed to delete certificate %s: %w", certName, err) } + metrics.ClusteringStoppedVec.DeleteLabelValues(cluster.Name, cluster.Namespace) + metrics.ReconciliationStoppedVec.DeleteLabelValues(cluster.Name, cluster.Namespace) + return nil } @@ -1690,15 +1719,102 @@ func (r *MySQLClusterReconciler) updateStatus(ctx context.Context, cluster *moco }, ) + meta.SetStatusCondition(&cluster.Status.Conditions, + metav1.Condition{ + Type: mocov1beta2.ConditionReconciliationActive, + Status: metav1.ConditionTrue, + Reason: "ReconciliationActive", + Message: "reconciliation is active", + }, + ) + + if !equality.Semantic.DeepEqual(orig, cluster) { + if err := r.Status().Update(ctx, cluster); err != nil { + return err + } + log.Info("update status successfully") + } + + metrics.ReconciliationStoppedVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(0) + + return nil +} + +func (r *MySQLClusterReconciler) clusteringStopV1(ctx context.Context, cluster *mocov1beta2.MySQLCluster) error { + log := crlog.FromContext(ctx) + orig := cluster.DeepCopy() + + if meta.IsStatusConditionFalse(cluster.Status.Conditions, mocov1beta2.ConditionClusteringActive) { + return nil + } + + r.ClusterManager.Pause(types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.Name}) + + for _, cond := range cluster.Status.Conditions { + if cond.Type == mocov1beta2.ConditionAvailable || cond.Type == mocov1beta2.ConditionHealthy { + cond.Status = metav1.ConditionUnknown + meta.SetStatusCondition(&cluster.Status.Conditions, cond) + } + } + + meta.SetStatusCondition(&cluster.Status.Conditions, + metav1.Condition{ + Type: mocov1beta2.ConditionClusteringActive, + Status: metav1.ConditionFalse, + Reason: "ClusteringInactive", + Message: "clustering is inactive", + }, + ) + if !equality.Semantic.DeepEqual(orig, cluster) { if err := r.Status().Update(ctx, cluster); err != nil { return err } log.Info("update status successfully") } + + metrics.ClusteringStoppedVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(1) + return nil } +func (r *MySQLClusterReconciler) reconciliationStopV1(ctx context.Context, cluster *mocov1beta2.MySQLCluster) error { + log := crlog.FromContext(ctx) + orig := cluster.DeepCopy() + + if meta.IsStatusConditionFalse(cluster.Status.Conditions, mocov1beta2.ConditionReconciliationActive) { + return nil + } + + meta.SetStatusCondition(&cluster.Status.Conditions, + metav1.Condition{ + Type: mocov1beta2.ConditionReconciliationActive, + Status: metav1.ConditionFalse, + Reason: "ReconciliationInactive", + Message: "reconciliation is inactive", + }, + ) + + if !equality.Semantic.DeepEqual(orig, cluster) { + if err := r.Status().Update(ctx, cluster); err != nil { + return err + } + log.Info("update status successfully") + } + + metrics.ReconciliationStoppedVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(1) + + return nil +} + +func isReconciliationStopped(cluster *mocov1beta2.MySQLCluster) bool { + return cluster.Annotations[constants.AnnReconciliationStopped] == "true" +} + +func isClusteringStopped(cluster *mocov1beta2.MySQLCluster) bool { + return cluster.Annotations[constants.AnnClusteringStopped] == "true" +} + func setControllerReferenceWithConfigMap(cluster *mocov1beta2.MySQLCluster, cm *corev1ac.ConfigMapApplyConfiguration, scheme *runtime.Scheme) error { gvk, err := apiutil.GVKForObject(cluster, scheme) if err != nil { diff --git a/controllers/mysqlcluster_controller_test.go b/controllers/mysqlcluster_controller_test.go index 1fd49944e..5746b6bb4 100644 --- a/controllers/mysqlcluster_controller_test.go +++ b/controllers/mysqlcluster_controller_test.go @@ -2017,4 +2017,64 @@ var _ = Describe("MySQLCluster reconciler", func() { return nil }).Should(Succeed()) }) + + It("should reconciliation stopped", func() { + cluster := testNewMySQLCluster("test") + err := k8sClient.Create(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + + By("setting reconcile stop annotation") + if cluster.Annotations == nil { + cluster.Annotations = map[string]string{} + } + cluster.Annotations[constants.AnnReconciliationStopped] = "true" + err = k8sClient.Update(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + + By("checking condition is false") + Eventually(func() error { + cluster = &mocov1beta2.MySQLCluster{} + if err = k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: "test"}, cluster); err != nil { + return err + } + cond := meta.FindStatusCondition(cluster.Status.Conditions, mocov1beta2.ConditionReconciliationActive) + if cond == nil { + return fmt.Errorf("condition does not exists") + } + if cond.Status != metav1.ConditionFalse { + return fmt.Errorf("condition is not false") + } + return nil + }).Should(Succeed()) + }) + + It("should clustering stopped", func() { + cluster := testNewMySQLCluster("test") + err := k8sClient.Create(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + + By("setting clustering stop annotation") + if cluster.Annotations == nil { + cluster.Annotations = map[string]string{} + } + cluster.Annotations[constants.AnnClusteringStopped] = "true" + err = k8sClient.Update(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + + By("checking condition is false") + Eventually(func() error { + cluster = &mocov1beta2.MySQLCluster{} + if err = k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: "test"}, cluster); err != nil { + return err + } + cond := meta.FindStatusCondition(cluster.Status.Conditions, mocov1beta2.ConditionClusteringActive) + if cond == nil { + return fmt.Errorf("condition does not exists") + } + if cond.Status != metav1.ConditionFalse { + return fmt.Errorf("condition is not false") + } + return nil + }).Should(Succeed()) + }) }) diff --git a/docs/metrics.md b/docs/metrics.md index 7abdf1888..fa56c3896 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -26,6 +26,8 @@ All these metrics are prefixed with `moco_cluster_` and have `name` and `namespa | `failover_total` | The number of times MOCO changed the failed primary instance | Counter | | `replicas` | The number of mysqld instances in the cluster | Gauge | | `ready_replicas` | The number of ready mysqld Pods in the cluster | Gauge | +| `clustering_stopped` | 1 if the cluster is clustering stopped, 0 otherwise | Gauge | +| `reconciliation_stopped` | 1 if the cluster is reconciliation stopped, 0 otherwise | Gauge | | `errant_replicas` | The number of mysqld instances that have [errant transactions][errant] | Gauge | | `processing_time_seconds` | The length of time in seconds processing the cluster | Histogram | | `volume_resized_total` | The number of successful volume resizes | Counter | diff --git a/docs/usage.md b/docs/usage.md index 21afb7be6..f3089f7a9 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -722,3 +722,54 @@ Delete such pending Pods until PVC is actually removed. [MinIO]: https://min.io/ [EKS]: https://aws.amazon.com/eks/ [CronJob]: https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ + +### Stop Clustering and Reconciliation + +In MOCO, you can optionally stop the clustering and reconciliation of a MySQLCluster. + +To stop clustering and reconciliation, use the following commands. + +```console +$ kubectl moco stop clustering +$ kubectl moco stop reconciliation +``` + +To resume the stopped clustering and reconciliation, use the following commands. + +```console +$ kubectl moco start clustering +$ kubectl moco start reconciliation +``` + +You could use this feature in the following cases: + +1. To stop the replication of a MySQLCluster and perform a manual operation to align the GTID + * Run the `kubectl moco stop clustering` command on the MySQLCluster where you want to stop the replication +2. To suppress the full update of MySQLCluster that occurs during the upgrade of MOCO + * Run the `kubectl moco stop reconciliation` command on the MySQLCluster on which you want to suppress the update + +To check whether clustering and reconciliation are stopped, use `kubectl get mysqlcluster`. +Moreover, while clustering is stopped, `AVAILABLE` and `HEALTHY` values will be `Unknown`. + +```console +$ kubectl get mysqlcluster +NAME AVAILABLE HEALTHY PRIMARY SYNCED REPLICAS ERRANT REPLICAS CLUSTERING ACTIVE RECONCILE ACTIVE LAST BACKUP +test Unknown Unknown 0 3 False False +``` + +The MOCO controller outputs the following metrics to indicate that clustering has been stopped. +1 if the cluster is clustering or reconciliation stopped, 0 otherwise. + +```text +moco_cluster_clustering_stopped{name="mycluster", namespace="mynamesapce"} 1 +moco_cluster_reconciliation_stopped{name="mycluster", namespace="mynamesapce"} 1 +``` + +During the stop of clustering, monitoring of the cluster from MOCO will be halted, and the value of the following metrics will become NaN. + +```text +moco_cluster_available{name="test",namespace="default"} NaN +moco_cluster_healthy{name="test",namespace="default"} NaN +moco_cluster_ready_replicas{name="test",namespace="default"} NaN +moco_cluster_errant_replicas{name="test",namespace="default"} NaN +``` diff --git a/e2e/Makefile b/e2e/Makefile index d36089083..28c09432d 100644 --- a/e2e/Makefile +++ b/e2e/Makefile @@ -55,7 +55,7 @@ endif .PHONY: test test: env PATH="$$(pwd)/../bin:$$PATH" RUN_E2E=1 \ - go test -v -race -timeout 40m . -ginkgo.progress -ginkgo.v -ginkgo.failFast + go test -v -race -timeout 60m . -ginkgo.progress -ginkgo.v -ginkgo.failFast .PHONY: test-upgrade test-upgrade: diff --git a/e2e/stop_test.go b/e2e/stop_test.go new file mode 100644 index 000000000..d393f3bf8 --- /dev/null +++ b/e2e/stop_test.go @@ -0,0 +1,361 @@ +package e2e + +import ( + "bytes" + _ "embed" + "encoding/json" + "errors" + "fmt" + "math" + "time" + + mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/prometheus/common/expfmt" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +//go:embed testdata/stop.yaml +var stopYAML string + +//go:embed testdata/stop_changed.yaml +var stopChangedYAML string + +var _ = Context("stop reconciliation and clustering", func() { + if doUpgrade { + return + } + + It("should construct a 3-instance cluster", func() { + kubectlSafe(fillTemplate(stopYAML), "apply", "-f", "-") + Eventually(func() error { + cluster, err := getCluster("stop", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("cluster is not healthy: %s", cond.Status) + } + return errors.New("no health condition") + }).Should(Succeed()) + + kubectlSafe(nil, "moco", "mysql", "test", + "-n", "stop", + "-u", "moco-writable", + "--", "-e", "CREATE DATABASE test;") + kubectlSafe(nil, "moco", "mysql", "test", + "-n", "stop", + "-u", "moco-writable", + "--", "-e", "CREATE TABLE test.t1 (foo int);") + kubectlSafe(nil, "moco", "mysql", "test", + "-n", "stop", + "-u", "moco-writable", + "--", "-e", "INSERT INTO test.t1 (foo) VALUES (1); COMMIT;") + }) + + It("should stop reconciliation", func() { + kubectlSafe(nil, "moco", "stop", "reconciliation", "test", "-n", "stop") + Eventually(func() error { + cluster, err := getCluster("stop", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionReconciliationActive { + continue + } + if cond.Status == metav1.ConditionFalse { + return nil + } + return fmt.Errorf("reconciliation is active: %s", cond.Status) + } + return errors.New("no reconciliation condition") + }).Should(Succeed()) + + kubectlSafe(fillTemplate(stopChangedYAML), "apply", "-f", "-") + + timeout := 30 * time.Second + Consistently(func() error { + out, err := kubectl(nil, "get", "-n", "stop", "statefulset", "moco-test", "-o", "json") + if err != nil { + return err + } + sts := &appsv1.StatefulSet{} + err = json.Unmarshal(out, sts) + if err != nil { + return err + } + + if _, ok := sts.Spec.Template.Labels["foo"]; ok { + return errors.New("label exists") + } + return nil + }, timeout).Should(Succeed()) + }) + + It("should restart reconciliation", func() { + kubectlSafe(nil, "moco", "start", "reconciliation", "test", "-n", "stop") + Eventually(func() error { + cluster, err := getCluster("stop", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionReconciliationActive { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("reconciliation is not active: %s", cond.Status) + } + return errors.New("no reconciliation condition") + }).Should(Succeed()) + + Eventually(func() error { + out, err := kubectl(nil, "get", "-n", "stop", "statefulset", "moco-test", "-o", "json") + if err != nil { + return err + } + sts := &appsv1.StatefulSet{} + err = json.Unmarshal(out, sts) + if err != nil { + return err + } + + if _, ok := sts.Spec.Template.Labels["foo"]; !ok { + return errors.New("label does not exists") + } + + return nil + }).Should(Succeed()) + }) + + It("should stop clustering and prevent failover", func() { + kubectlSafe(nil, "moco", "stop", "clustering", "test", "-n", "stop") + Eventually(func() error { + cluster, err := getCluster("stop", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionClusteringActive { + continue + } + if cond.Status == metav1.ConditionFalse { + return nil + } + return fmt.Errorf("reconciliation is active: %s", cond.Status) + } + return errors.New("no reconciliation condition") + }).Should(Succeed()) + + cluster, err := getCluster("stop", "test") + Expect(err).NotTo(HaveOccurred()) + + currentPrimaryIndex := cluster.Status.CurrentPrimaryIndex + kubectlSafe(nil, "exec", + fmt.Sprintf("moco-test-%d", currentPrimaryIndex), + "-n", "stop", + "-c", "mysqld", + "--", "kill", "1") + + timeout := 3 * time.Minute + Consistently(func() error { + cluster, err := getCluster("stop", "test") + if err != nil { + return err + } + if currentPrimaryIndex != cluster.Status.CurrentPrimaryIndex { + return errors.New("failover executed while clustering was stopped") + } + return nil + }, timeout).Should(Succeed()) + }) + + It("should resume clustering and execute failover", func() { + cluster, err := getCluster("stop", "test") + Expect(err).NotTo(HaveOccurred()) + + currentPrimaryIndex := cluster.Status.CurrentPrimaryIndex + + kubectlSafe(nil, "moco", "start", "clustering", "test", "-n", "stop") + Eventually(func() error { + cluster, err := getCluster("stop", "test") + if err != nil { + return err + } + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionReconciliationActive { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("reconciliation is not active: %s", cond.Status) + } + return errors.New("no reconciliation condition") + }).Should(Succeed()) + + Eventually(func() error { + cluster, err := getCluster("stop", "test") + if err != nil { + return err + } + if currentPrimaryIndex == cluster.Status.CurrentPrimaryIndex { + return errors.New("failover not yet executed") + } + + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("cluster is not healthy: %s", cond.Status) + } + return errors.New("no health condition") + }).Should(Succeed()) + }) + + It("active metrics", func() { + Eventually(func() error { + cluster, err := getCluster("stop", "test") + if err != nil { + return err + } + reconcileCond := meta.FindStatusCondition(cluster.Status.Conditions, mocov1beta2.ConditionReconciliationActive) + if reconcileCond.Status == metav1.ConditionFalse { + return fmt.Errorf("reconciliation is stopped: %s", reconcileCond.Status) + } + clusteringCond := meta.FindStatusCondition(cluster.Status.Conditions, mocov1beta2.ConditionClusteringActive) + if clusteringCond.Status == metav1.ConditionFalse { + return fmt.Errorf("clustering is stopped: %s", clusteringCond.Status) + } + return nil + }).Should(Succeed()) + + out := kubectlSafe(nil, "-n", "moco-system", "get", "pods", "-l", "app.kubernetes.io/component=moco-controller", "-o", "json") + pods := &corev1.PodList{} + err := json.Unmarshal(out, pods) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(1)) + addr := pods.Items[0].Status.PodIP + out, err = runInPod("curl", "-sf", fmt.Sprintf("http://%s:8080/metrics", addr)) + Expect(err).NotTo(HaveOccurred()) + + mfs, err := (&expfmt.TextParser{}).TextToMetricFamilies(bytes.NewReader(out)) + Expect(err).NotTo(HaveOccurred()) + + clusteringMf := mfs["moco_cluster_clustering_stopped"] + Expect(clusteringMf).NotTo(BeNil()) + clusteringMetric := findMetric(clusteringMf, map[string]string{"namespace": "stop", "name": "test"}) + Expect(clusteringMetric).NotTo(BeNil()) + Expect(clusteringMetric.GetGauge().GetValue()).To(BeNumerically("==", 0)) + + reconcileMf := mfs["moco_cluster_reconciliation_stopped"] + Expect(reconcileMf).NotTo(BeNil()) + reconcileMetric := findMetric(reconcileMf, map[string]string{"namespace": "stop", "name": "test"}) + Expect(reconcileMetric).NotTo(BeNil()) + Expect(reconcileMetric.GetGauge().GetValue()).To(BeNumerically("==", 0)) + }) + + It("stopped metrics", func() { + kubectlSafe(nil, "moco", "stop", "clustering", "test", "-n", "stop") + kubectlSafe(nil, "moco", "stop", "reconciliation", "test", "-n", "stop") + + Eventually(func() error { + cluster, err := getCluster("stop", "test") + if err != nil { + return err + } + reconcileCond := meta.FindStatusCondition(cluster.Status.Conditions, mocov1beta2.ConditionReconciliationActive) + if reconcileCond.Status == metav1.ConditionTrue { + return fmt.Errorf("reconciliation is active: %s", reconcileCond.Status) + } + clusteringCond := meta.FindStatusCondition(cluster.Status.Conditions, mocov1beta2.ConditionClusteringActive) + if clusteringCond.Status == metav1.ConditionTrue { + return fmt.Errorf("clustering is active: %s", clusteringCond.Status) + } + return nil + }).Should(Succeed()) + + out := kubectlSafe(nil, "-n", "moco-system", "get", "pods", "-l", "app.kubernetes.io/component=moco-controller", "-o", "json") + pods := &corev1.PodList{} + err := json.Unmarshal(out, pods) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(1)) + addr := pods.Items[0].Status.PodIP + out, err = runInPod("curl", "-sf", fmt.Sprintf("http://%s:8080/metrics", addr)) + Expect(err).NotTo(HaveOccurred()) + + mfs, err := (&expfmt.TextParser{}).TextToMetricFamilies(bytes.NewReader(out)) + Expect(err).NotTo(HaveOccurred()) + + clusteringMf := mfs["moco_cluster_clustering_stopped"] + Expect(clusteringMf).NotTo(BeNil()) + clusteringMetric := findMetric(clusteringMf, map[string]string{"namespace": "stop", "name": "test"}) + Expect(clusteringMetric).NotTo(BeNil()) + Expect(clusteringMetric.GetGauge().GetValue()).To(BeNumerically("==", 1)) + + reconcileMf := mfs["moco_cluster_reconciliation_stopped"] + Expect(reconcileMf).NotTo(BeNil()) + reconcileMetric := findMetric(reconcileMf, map[string]string{"namespace": "stop", "name": "test"}) + Expect(reconcileMetric).NotTo(BeNil()) + Expect(reconcileMetric.GetGauge().GetValue()).To(BeNumerically("==", 1)) + + healthyMf := mfs["moco_cluster_healthy"] + Expect(healthyMf).NotTo(BeNil()) + healthyMetric := findMetric(healthyMf, map[string]string{"namespace": "stop", "name": "test"}) + Expect(healthyMetric).NotTo(BeNil()) + Expect(math.IsNaN(healthyMetric.GetGauge().GetValue())).To(BeTrue()) + + availableMf := mfs["moco_cluster_available"] + Expect(availableMf).NotTo(BeNil()) + availableMetric := findMetric(availableMf, map[string]string{"namespace": "stop", "name": "test"}) + Expect(availableMetric).NotTo(BeNil()) + Expect(math.IsNaN(availableMetric.GetGauge().GetValue())).To(BeTrue()) + + readyReplicasMf := mfs["moco_cluster_ready_replicas"] + Expect(readyReplicasMf).NotTo(BeNil()) + readyReplicasMetric := findMetric(readyReplicasMf, map[string]string{"namespace": "stop", "name": "test"}) + Expect(readyReplicasMetric).NotTo(BeNil()) + Expect(math.IsNaN(readyReplicasMetric.GetGauge().GetValue())).To(BeTrue()) + + errantReplicasMf := mfs["moco_cluster_errant_replicas"] + Expect(errantReplicasMf).NotTo(BeNil()) + errantReplicasMetric := findMetric(errantReplicasMf, map[string]string{"namespace": "stop", "name": "test"}) + Expect(errantReplicasMetric).NotTo(BeNil()) + Expect(math.IsNaN(errantReplicasMetric.GetGauge().GetValue())).To(BeTrue()) + }) + + It("should delete clusters", func() { + kubectlSafe(nil, "delete", "-n", "stop", "mysqlclusters", "--all") + + Eventually(func() error { + out, err := kubectl(nil, "get", "-n", "stop", "pod", "-o", "json") + if err != nil { + return err + } + pods := &corev1.PodList{} + if err := json.Unmarshal(out, pods); err != nil { + return err + } + if len(pods.Items) > 0 { + return errors.New("wait until all Pods are deleted") + } + return nil + }).Should(Succeed()) + }) +}) diff --git a/e2e/testdata/stop.yaml b/e2e/testdata/stop.yaml new file mode 100644 index 000000000..bd75b7951 --- /dev/null +++ b/e2e/testdata/stop.yaml @@ -0,0 +1,25 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: stop +--- +apiVersion: moco.cybozu.com/v1beta2 +kind: MySQLCluster +metadata: + namespace: stop + name: test +spec: + replicas: 3 + podTemplate: + spec: + containers: + - name: mysqld + image: ghcr.io/cybozu-go/moco/mysql:{{ . }} + volumeClaimTemplates: + - metadata: + name: mysql-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi diff --git a/e2e/testdata/stop_changed.yaml b/e2e/testdata/stop_changed.yaml new file mode 100644 index 000000000..c951694a7 --- /dev/null +++ b/e2e/testdata/stop_changed.yaml @@ -0,0 +1,28 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: stop +--- +apiVersion: moco.cybozu.com/v1beta2 +kind: MySQLCluster +metadata: + namespace: stop + name: test +spec: + replicas: 3 + podTemplate: + metadata: + labels: + foo: bar + spec: + containers: + - name: mysqld + image: ghcr.io/cybozu-go/moco/mysql:{{ . }} + volumeClaimTemplates: + - metadata: + name: mysql-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi diff --git a/pkg/constants/meta.go b/pkg/constants/meta.go index 87c34c7dc..061284ee7 100644 --- a/pkg/constants/meta.go +++ b/pkg/constants/meta.go @@ -17,8 +17,10 @@ const ( // annotation keys and values const ( - AnnDemote = "moco.cybozu.com/demote" - AnnSecretVersion = "moco.cybozu.com/secret-version" + AnnDemote = "moco.cybozu.com/demote" + AnnSecretVersion = "moco.cybozu.com/secret-version" + AnnClusteringStopped = "moco.cybozu.com/clustering-stopped" + AnnReconciliationStopped = "moco.cybozu.com/reconciliation-stopped" ) // MySQLClusterFinalizer is the finalizer specifier for MySQLCluster. diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 71d24b9ea..18ddc0df2 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -27,6 +27,9 @@ var ( VolumeResizedErrorTotal *prometheus.CounterVec StatefulSetRecreateTotal *prometheus.CounterVec StatefulSetRecreateErrorTotal *prometheus.CounterVec + + ClusteringStoppedVec *prometheus.GaugeVec + ReconciliationStoppedVec *prometheus.GaugeVec ) // Backup related metrics @@ -201,4 +204,20 @@ func Register(registry prometheus.Registerer) { Help: "The number of failed StatefulSet recreates", }, []string{"name", "namespace"}) registry.MustRegister(StatefulSetRecreateErrorTotal) + + ClusteringStoppedVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: clusteringSubsystem, + Name: "clustering_stopped", + Help: "Indicates if clustering has stopped", + }, []string{"name", "namespace"}) + registry.MustRegister(ClusteringStoppedVec) + + ReconciliationStoppedVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: clusteringSubsystem, + Name: "reconciliation_stopped", + Help: "Indicates if reconciliation has stopped", + }, []string{"name", "namespace"}) + registry.MustRegister(ReconciliationStoppedVec) }