diff --git a/api/v1beta1/mysqlcluster_types.go b/api/v1beta1/mysqlcluster_types.go index 329c58a76..2835255af 100644 --- a/api/v1beta1/mysqlcluster_types.go +++ b/api/v1beta1/mysqlcluster_types.go @@ -397,7 +397,7 @@ func (r *MySQLCluster) ReplicaServiceName() string { // PodHostname returns the hostname of a Pod with the given index. func (r *MySQLCluster) PodHostname(index int) string { - return fmt.Sprintf("%s.%s.%s.svc", r.PodName(index), r.PrefixedName(), r.Namespace) + return fmt.Sprintf("%s.%s.%s.svc", r.PodName(index), r.HeadlessServiceName(), r.Namespace) } // SlowQueryLogAgentConfigMapName returns the name of the slow query log agent config name. diff --git a/clustering/manager.go b/clustering/manager.go index a826fdd50..20765dbd5 100644 --- a/clustering/manager.go +++ b/clustering/manager.go @@ -5,7 +5,6 @@ import ( "sync" "time" - mocov1beta1 "github.com/cybozu-go/moco/api/v1beta1" "github.com/cybozu-go/moco/pkg/dbop" "github.com/go-logr/logr" _ "github.com/go-sql-driver/mysql" @@ -24,7 +23,7 @@ import ( // // This interface is meant to be used by MySQLClusterReconciler. type ClusterManager interface { - Update(context.Context, *mocov1beta1.MySQLCluster) + Update(context.Context, types.NamespacedName) Stop(types.NamespacedName) } @@ -56,11 +55,10 @@ type clusterManager struct { processes map[string]*managerProcess } -func (m *clusterManager) Update(ctx context.Context, cluster *mocov1beta1.MySQLCluster) { +func (m *clusterManager) Update(ctx context.Context, name types.NamespacedName) { m.mu.Lock() defer m.mu.Unlock() - name := client.ObjectKeyFromObject(cluster) key := name.String() p, ok := m.processes[key] if ok { diff --git a/clustering/manager_test.go b/clustering/manager_test.go index bee26e24c..2b5838e34 100644 --- a/clustering/manager_test.go +++ b/clustering/manager_test.go @@ -153,7 +153,7 @@ var _ = Describe("manager", func() { cluster, err := testGetCluster(ctx) Expect(err).NotTo(HaveOccurred()) - cm.Update(ctx, cluster) + cm.Update(ctx, client.ObjectKeyFromObject(cluster)) Eventually(func() error { cluster, err = testGetCluster(ctx) @@ -273,7 +273,7 @@ var _ = Describe("manager", func() { cm := NewClusterManager(1*time.Second, mgr, of, af, stdr.New(nil)) cluster, err := testGetCluster(ctx) Expect(err).NotTo(HaveOccurred()) - cm.Update(ctx, cluster) + cm.Update(ctx, client.ObjectKeyFromObject(cluster)) defer func() { cm.Stop(client.ObjectKeyFromObject(cluster)) time.Sleep(400 * time.Millisecond) @@ -614,7 +614,7 @@ var _ = Describe("manager", func() { cm := NewClusterManager(1*time.Second, mgr, of, af, stdr.New(nil)) cluster, err := testGetCluster(ctx) Expect(err).NotTo(HaveOccurred()) - cm.Update(ctx, cluster) + cm.Update(ctx, client.ObjectKeyFromObject(cluster)) defer func() { cm.Stop(client.ObjectKeyFromObject(cluster)) time.Sleep(400 * time.Millisecond) diff --git a/clustering/status.go b/clustering/status.go index 0e5ae3a70..b55d7da10 100644 --- a/clustering/status.go +++ b/clustering/status.go @@ -91,7 +91,9 @@ type StatusSet struct { // Close closes `ss.DBOps`. func (ss *StatusSet) Close() { for _, op := range ss.DBOps { - op.Close() + if op != nil { + op.Close() + } } } @@ -163,6 +165,11 @@ func (p *managerProcess) GatherStatus(ctx context.Context) (*StatusSet, error) { } ss.DBOps = make([]dbop.Operator, cluster.Spec.Replicas) + defer func() { + if ss.State == StateUndecided { + ss.Close() + } + }() for i := 0; i < int(cluster.Spec.Replicas); i++ { op, err := p.dbf.New(ctx, cluster, passwd, i) if err != nil { @@ -170,11 +177,6 @@ func (p *managerProcess) GatherStatus(ctx context.Context) (*StatusSet, error) { } ss.DBOps[i] = op } - defer func() { - if ss.State == StateUndecided { - ss.Close() - } - }() ss.MySQLStatus = make([]*dbop.MySQLInstanceStatus, cluster.Spec.Replicas) var wg sync.WaitGroup diff --git a/controllers/mysql_container.go b/controllers/mysql_container.go index c071a932b..fe9c4ba55 100644 --- a/controllers/mysql_container.go +++ b/controllers/mysql_container.go @@ -153,10 +153,12 @@ func (r *MySQLClusterReconciler) makeV1AgentContainer(cluster *mocov1beta1.MySQL return c } -func (r *MySQLClusterReconciler) makeV1SlowQueryLogContainer(sts *appsv1.StatefulSet) corev1.Container { - for _, c := range sts.Spec.Template.Spec.Containers { - if c.Name == constants.SlowQueryLogAgentContainerName { - return c +func (r *MySQLClusterReconciler) makeV1SlowQueryLogContainer(sts *appsv1.StatefulSet, force bool) corev1.Container { + if !force { + for _, c := range sts.Spec.Template.Spec.Containers { + if c.Name == constants.SlowQueryLogAgentContainerName { + return c + } } } diff --git a/controllers/mysqlcluster_controller.go b/controllers/mysqlcluster_controller.go index 884088d08..e6b3eef72 100644 --- a/controllers/mysqlcluster_controller.go +++ b/controllers/mysqlcluster_controller.go @@ -191,7 +191,7 @@ func (r *MySQLClusterReconciler) reconcileV1(ctx context.Context, req ctrl.Reque } } - r.ClusterManager.Update(ctx, cluster) + r.ClusterManager.Update(ctx, client.ObjectKeyFromObject(cluster)) return ctrl.Result{}, nil } @@ -476,6 +476,7 @@ func (r *MySQLClusterReconciler) reconcileV1Service1(ctx context.Context, cluste if headless { saSpec.ClusterIP = corev1.ClusterIPNone saSpec.Type = corev1.ServiceTypeClusterIP + saSpec.PublishNotReadyAddresses = true } else { saSpec.ClusterIP = svc.Spec.ClusterIP if len(saSpec.Type) == 0 { @@ -557,7 +558,7 @@ func (r *MySQLClusterReconciler) reconcileV1StatefulSet(ctx context.Context, req MatchLabels: labelSet(cluster, false), } sts.Spec.PodManagementPolicy = appsv1.ParallelPodManagement - sts.Spec.ServiceName = cluster.PrefixedName() + sts.Spec.ServiceName = cluster.HeadlessServiceName() sts.Spec.VolumeClaimTemplates = make([]corev1.PersistentVolumeClaim, len(cluster.Spec.VolumeClaimTemplates)) for i, v := range cluster.Spec.VolumeClaimTemplates { @@ -654,7 +655,8 @@ func (r *MySQLClusterReconciler) reconcileV1StatefulSet(ctx context.Context, req containers = append(containers, mysqldContainer) containers = append(containers, r.makeV1AgentContainer(cluster, sts.Spec.Template.Spec.Containers)) if !cluster.Spec.DisableSlowQueryLogContainer { - containers = append(containers, r.makeV1SlowQueryLogContainer(sts)) + force := cluster.Status.ReconcileInfo.Generation != cluster.Generation + containers = append(containers, r.makeV1SlowQueryLogContainer(sts, force)) } containers = append(containers, r.makeV1OptionalContainers(cluster, sts.Spec.Template.Spec.Containers)...) podSpec.Containers = containers diff --git a/controllers/mysqlcluster_controller_test.go b/controllers/mysqlcluster_controller_test.go index 9a2f6eecf..c900b8834 100644 --- a/controllers/mysqlcluster_controller_test.go +++ b/controllers/mysqlcluster_controller_test.go @@ -32,11 +32,10 @@ type mockManager struct { var _ clustering.ClusterManager = &mockManager{} -func (m *mockManager) Update(ctx context.Context, cluster *mocov1beta1.MySQLCluster) { +func (m *mockManager) Update(ctx context.Context, key types.NamespacedName) { m.mu.Lock() defer m.mu.Unlock() - key := client.ObjectKeyFromObject(cluster) m.clusters[key.String()] = struct{}{} } @@ -454,6 +453,8 @@ var _ = Describe("MySQLCluster reconciler", func() { Expect(primary.Spec.Selector).To(HaveKeyWithValue("moco.cybozu.com/role", "primary")) Expect(replica.Spec.Selector).To(HaveKeyWithValue("moco.cybozu.com/role", "replica")) + Expect(headless.Spec.PublishNotReadyAddresses).To(BeTrue()) + cluster = &mocov1beta1.MySQLCluster{} err = k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: "test"}, cluster) Expect(err).NotTo(HaveOccurred()) diff --git a/docs/reconcile.md b/docs/reconcile.md index 1ba4d5600..6486984e7 100644 --- a/docs/reconcile.md +++ b/docs/reconcile.md @@ -26,7 +26,7 @@ It updates the StatefulSet only when the update is a must. The StatefulSet will be updated when: -- The fields under `spec` of MySQLCluster are modified. +- Some fields under `spec` of MySQLCluster are modified. - `my.cnf` for mysqld is updated. - the version of the reconciler used to reconcile the StatefulSet is obsoleted. - the image of moco-agent given to the controller is updated. @@ -36,6 +36,9 @@ The StatefulSet will be updated when: - the image of fluent-bit given to the controller is changed. - because the controller does not depend on fluent-bit. +The fluent-bit sidecar container is updated only when some fields under +`spec` of MySQLCluster are modified. + ## Service MOCO creates three Services for each MySQLCluster, that is: diff --git a/e2e/lifecycle_test.go b/e2e/lifecycle_test.go index f54c0f092..8207e7779 100644 --- a/e2e/lifecycle_test.go +++ b/e2e/lifecycle_test.go @@ -198,5 +198,20 @@ var _ = Context("lifecycle", func() { } return fmt.Errorf("pending service accounts: %+v", sas.Items) }).Should(Succeed()) + Eventually(func() error { + services := &corev1.ServiceList{} + out, err := kubectl(nil, "-n", "foo", "get", "service", "-o", "json") + if err != nil { + return err + } + err = json.Unmarshal(out, services) + if err != nil { + return err + } + if len(services.Items) == 0 { + return nil + } + return fmt.Errorf("pending services: %+v", services.Items) + }).Should(Succeed()) }) }) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 9e4636334..c933a4f82 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -57,7 +57,7 @@ const ( FluentBitConfigPath = "/fluent-bit/etc" ) -// Environment variables for moco-agent +// Environment variables const ( PodNameEnvKey = "POD_NAME" PodNamespaceEnvKey = "POD_NAMESPACE" diff --git a/pkg/dbop/operator.go b/pkg/dbop/operator.go index d136cdba2..06a0a20bf 100644 --- a/pkg/dbop/operator.go +++ b/pkg/dbop/operator.go @@ -95,9 +95,9 @@ func (f defaultFactory) New(ctx context.Context, cluster *mocov1beta1.MySQLClust cfg.ParseTime = true cfg.Timeout = connTimeout cfg.ReadTimeout = readTimeout - db, err := sqlx.Connect("mysql", cfg.FormatDSN()) + db, err := sqlx.Open("mysql", cfg.FormatDSN()) if err != nil { - return nil, fmt.Errorf("failed to connect to %s: %w", cluster.PodName(index), err) + return nil, fmt.Errorf("failed to open %s: %w", cluster.PodName(index), err) } db.SetMaxIdleConns(1) db.SetConnMaxIdleTime(30 * time.Second) diff --git a/pkg/password/password.go b/pkg/password/password.go index 6b19aba7f..5ab8e34f4 100644 --- a/pkg/password/password.go +++ b/pkg/password/password.go @@ -20,8 +20,8 @@ const ( agentPasswordKey = "AGENT_PASSWORD" replicationPasswordKey = "REPLICATION_PASSWORD" cloneDonorPasswordKey = "CLONE_DONOR_PASSWORD" - ReadOnlyPasswordKey = "READONLY_PASSWORD" - WritablePasswordKey = "WRITABLE_PASSWORD" + readOnlyPasswordKey = "READONLY_PASSWORD" + writablePasswordKey = "WRITABLE_PASSWORD" ) // MySQLPassword represents a set of passwords of MySQL users for MOCO @@ -87,8 +87,8 @@ func NewMySQLPasswordFromSecret(secret *corev1.Secret) (*MySQLPassword, error) { agent: string(secret.Data[agentPasswordKey]), replicator: string(secret.Data[replicationPasswordKey]), donor: string(secret.Data[cloneDonorPasswordKey]), - readOnly: string(secret.Data[ReadOnlyPasswordKey]), - writable: string(secret.Data[WritablePasswordKey]), + readOnly: string(secret.Data[readOnlyPasswordKey]), + writable: string(secret.Data[writablePasswordKey]), }, nil } @@ -106,8 +106,8 @@ func (p MySQLPassword) ToSecret() *corev1.Secret { agentPasswordKey: []byte(p.agent), replicationPasswordKey: []byte(p.replicator), cloneDonorPasswordKey: []byte(p.donor), - ReadOnlyPasswordKey: []byte(p.readOnly), - WritablePasswordKey: []byte(p.writable), + readOnlyPasswordKey: []byte(p.readOnly), + writablePasswordKey: []byte(p.writable), }, } } @@ -129,9 +129,9 @@ password="%s" }, }, Data: map[string][]byte{ + constants.AdminMyCnf: formatMyCnf(constants.AdminUser, p.admin), constants.ReadOnlyMyCnf: formatMyCnf(constants.ReadOnlyUser, p.readOnly), constants.WritableMyCnf: formatMyCnf(constants.WritableUser, p.writable), - constants.AdminMyCnf: formatMyCnf(constants.AdminUser, p.admin), }, } }