Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ymmt2005 committed Apr 30, 2021
1 parent f2ad972 commit c3b14cb
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 34 deletions.
2 changes: 1 addition & 1 deletion api/v1beta1/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (r *MySQLCluster) ReplicaServiceName() string {

// PodHostname returns the hostname of a Pod with the given index.
func (r *MySQLCluster) PodHostname(index int) string {
return fmt.Sprintf("%s.%s.%s.svc", r.PodName(index), r.PrefixedName(), r.Namespace)
return fmt.Sprintf("%s.%s.%s.svc", r.PodName(index), r.HeadlessServiceName(), r.Namespace)
}

// SlowQueryLogAgentConfigMapName returns the name of the slow query log agent config name.
Expand Down
6 changes: 2 additions & 4 deletions clustering/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"time"

mocov1beta1 "github.com/cybozu-go/moco/api/v1beta1"
"github.com/cybozu-go/moco/pkg/dbop"
"github.com/go-logr/logr"
_ "github.com/go-sql-driver/mysql"
Expand All @@ -24,7 +23,7 @@ import (
//
// This interface is meant to be used by MySQLClusterReconciler.
type ClusterManager interface {
Update(context.Context, *mocov1beta1.MySQLCluster)
Update(context.Context, types.NamespacedName)
Stop(types.NamespacedName)
}

Expand Down Expand Up @@ -56,11 +55,10 @@ type clusterManager struct {
processes map[string]*managerProcess
}

func (m *clusterManager) Update(ctx context.Context, cluster *mocov1beta1.MySQLCluster) {
func (m *clusterManager) Update(ctx context.Context, name types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

name := client.ObjectKeyFromObject(cluster)
key := name.String()
p, ok := m.processes[key]
if ok {
Expand Down
6 changes: 3 additions & 3 deletions clustering/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ var _ = Describe("manager", func() {

cluster, err := testGetCluster(ctx)
Expect(err).NotTo(HaveOccurred())
cm.Update(ctx, cluster)
cm.Update(ctx, client.ObjectKeyFromObject(cluster))

Eventually(func() error {
cluster, err = testGetCluster(ctx)
Expand Down Expand Up @@ -273,7 +273,7 @@ var _ = Describe("manager", func() {
cm := NewClusterManager(1*time.Second, mgr, of, af, stdr.New(nil))
cluster, err := testGetCluster(ctx)
Expect(err).NotTo(HaveOccurred())
cm.Update(ctx, cluster)
cm.Update(ctx, client.ObjectKeyFromObject(cluster))
defer func() {
cm.Stop(client.ObjectKeyFromObject(cluster))
time.Sleep(400 * time.Millisecond)
Expand Down Expand Up @@ -614,7 +614,7 @@ var _ = Describe("manager", func() {
cm := NewClusterManager(1*time.Second, mgr, of, af, stdr.New(nil))
cluster, err := testGetCluster(ctx)
Expect(err).NotTo(HaveOccurred())
cm.Update(ctx, cluster)
cm.Update(ctx, client.ObjectKeyFromObject(cluster))
defer func() {
cm.Stop(client.ObjectKeyFromObject(cluster))
time.Sleep(400 * time.Millisecond)
Expand Down
14 changes: 8 additions & 6 deletions clustering/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ type StatusSet struct {
// Close closes `ss.DBOps`.
func (ss *StatusSet) Close() {
for _, op := range ss.DBOps {
op.Close()
if op != nil {
op.Close()
}
}
}

Expand Down Expand Up @@ -163,18 +165,18 @@ func (p *managerProcess) GatherStatus(ctx context.Context) (*StatusSet, error) {
}

ss.DBOps = make([]dbop.Operator, cluster.Spec.Replicas)
defer func() {
if ss.State == StateUndecided {
ss.Close()
}
}()
for i := 0; i < int(cluster.Spec.Replicas); i++ {
op, err := p.dbf.New(ctx, cluster, passwd, i)
if err != nil {
return nil, err
}
ss.DBOps[i] = op
}
defer func() {
if ss.State == StateUndecided {
ss.Close()
}
}()

ss.MySQLStatus = make([]*dbop.MySQLInstanceStatus, cluster.Spec.Replicas)
var wg sync.WaitGroup
Expand Down
10 changes: 6 additions & 4 deletions controllers/mysql_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,12 @@ func (r *MySQLClusterReconciler) makeV1AgentContainer(cluster *mocov1beta1.MySQL
return c
}

func (r *MySQLClusterReconciler) makeV1SlowQueryLogContainer(sts *appsv1.StatefulSet) corev1.Container {
for _, c := range sts.Spec.Template.Spec.Containers {
if c.Name == constants.SlowQueryLogAgentContainerName {
return c
func (r *MySQLClusterReconciler) makeV1SlowQueryLogContainer(sts *appsv1.StatefulSet, force bool) corev1.Container {
if !force {
for _, c := range sts.Spec.Template.Spec.Containers {
if c.Name == constants.SlowQueryLogAgentContainerName {
return c
}
}
}

Expand Down
8 changes: 5 additions & 3 deletions controllers/mysqlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *MySQLClusterReconciler) reconcileV1(ctx context.Context, req ctrl.Reque
}
}

r.ClusterManager.Update(ctx, cluster)
r.ClusterManager.Update(ctx, client.ObjectKeyFromObject(cluster))
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -476,6 +476,7 @@ func (r *MySQLClusterReconciler) reconcileV1Service1(ctx context.Context, cluste
if headless {
saSpec.ClusterIP = corev1.ClusterIPNone
saSpec.Type = corev1.ServiceTypeClusterIP
saSpec.PublishNotReadyAddresses = true
} else {
saSpec.ClusterIP = svc.Spec.ClusterIP
if len(saSpec.Type) == 0 {
Expand Down Expand Up @@ -557,7 +558,7 @@ func (r *MySQLClusterReconciler) reconcileV1StatefulSet(ctx context.Context, req
MatchLabels: labelSet(cluster, false),
}
sts.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
sts.Spec.ServiceName = cluster.PrefixedName()
sts.Spec.ServiceName = cluster.HeadlessServiceName()

sts.Spec.VolumeClaimTemplates = make([]corev1.PersistentVolumeClaim, len(cluster.Spec.VolumeClaimTemplates))
for i, v := range cluster.Spec.VolumeClaimTemplates {
Expand Down Expand Up @@ -654,7 +655,8 @@ func (r *MySQLClusterReconciler) reconcileV1StatefulSet(ctx context.Context, req
containers = append(containers, mysqldContainer)
containers = append(containers, r.makeV1AgentContainer(cluster, sts.Spec.Template.Spec.Containers))
if !cluster.Spec.DisableSlowQueryLogContainer {
containers = append(containers, r.makeV1SlowQueryLogContainer(sts))
force := cluster.Status.ReconcileInfo.Generation != cluster.Generation
containers = append(containers, r.makeV1SlowQueryLogContainer(sts, force))
}
containers = append(containers, r.makeV1OptionalContainers(cluster, sts.Spec.Template.Spec.Containers)...)
podSpec.Containers = containers
Expand Down
5 changes: 3 additions & 2 deletions controllers/mysqlcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ type mockManager struct {

var _ clustering.ClusterManager = &mockManager{}

func (m *mockManager) Update(ctx context.Context, cluster *mocov1beta1.MySQLCluster) {
func (m *mockManager) Update(ctx context.Context, key types.NamespacedName) {
m.mu.Lock()
defer m.mu.Unlock()

key := client.ObjectKeyFromObject(cluster)
m.clusters[key.String()] = struct{}{}
}

Expand Down Expand Up @@ -454,6 +453,8 @@ var _ = Describe("MySQLCluster reconciler", func() {
Expect(primary.Spec.Selector).To(HaveKeyWithValue("moco.cybozu.com/role", "primary"))
Expect(replica.Spec.Selector).To(HaveKeyWithValue("moco.cybozu.com/role", "replica"))

Expect(headless.Spec.PublishNotReadyAddresses).To(BeTrue())

cluster = &mocov1beta1.MySQLCluster{}
err = k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: "test"}, cluster)
Expect(err).NotTo(HaveOccurred())
Expand Down
5 changes: 4 additions & 1 deletion docs/reconcile.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ It updates the StatefulSet only when the update is a must.

The StatefulSet will be updated when:

- The fields under `spec` of MySQLCluster are modified.
- Some fields under `spec` of MySQLCluster are modified.
- `my.cnf` for mysqld is updated.
- the version of the reconciler used to reconcile the StatefulSet is obsoleted.
- the image of moco-agent given to the controller is updated.
Expand All @@ -36,6 +36,9 @@ The StatefulSet will be updated when:
- the image of fluent-bit given to the controller is changed.
- because the controller does not depend on fluent-bit.

The fluent-bit sidecar container is updated only when some fields under
`spec` of MySQLCluster are modified.

## Service

MOCO creates three Services for each MySQLCluster, that is:
Expand Down
15 changes: 15 additions & 0 deletions e2e/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,20 @@ var _ = Context("lifecycle", func() {
}
return fmt.Errorf("pending service accounts: %+v", sas.Items)
}).Should(Succeed())
Eventually(func() error {
services := &corev1.ServiceList{}
out, err := kubectl(nil, "-n", "foo", "get", "service", "-o", "json")
if err != nil {
return err
}
err = json.Unmarshal(out, services)
if err != nil {
return err
}
if len(services.Items) == 0 {
return nil
}
return fmt.Errorf("pending services: %+v", services.Items)
}).Should(Succeed())
})
})
2 changes: 1 addition & 1 deletion pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
FluentBitConfigPath = "/fluent-bit/etc"
)

// Environment variables for moco-agent
// Environment variables
const (
PodNameEnvKey = "POD_NAME"
PodNamespaceEnvKey = "POD_NAMESPACE"
Expand Down
4 changes: 2 additions & 2 deletions pkg/dbop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ func (f defaultFactory) New(ctx context.Context, cluster *mocov1beta1.MySQLClust
cfg.ParseTime = true
cfg.Timeout = connTimeout
cfg.ReadTimeout = readTimeout
db, err := sqlx.Connect("mysql", cfg.FormatDSN())
db, err := sqlx.Open("mysql", cfg.FormatDSN())
if err != nil {
return nil, fmt.Errorf("failed to connect to %s: %w", cluster.PodName(index), err)
return nil, fmt.Errorf("failed to open %s: %w", cluster.PodName(index), err)
}
db.SetMaxIdleConns(1)
db.SetConnMaxIdleTime(30 * time.Second)
Expand Down
14 changes: 7 additions & 7 deletions pkg/password/password.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ const (
agentPasswordKey = "AGENT_PASSWORD"
replicationPasswordKey = "REPLICATION_PASSWORD"
cloneDonorPasswordKey = "CLONE_DONOR_PASSWORD"
ReadOnlyPasswordKey = "READONLY_PASSWORD"
WritablePasswordKey = "WRITABLE_PASSWORD"
readOnlyPasswordKey = "READONLY_PASSWORD"
writablePasswordKey = "WRITABLE_PASSWORD"
)

// MySQLPassword represents a set of passwords of MySQL users for MOCO
Expand Down Expand Up @@ -87,8 +87,8 @@ func NewMySQLPasswordFromSecret(secret *corev1.Secret) (*MySQLPassword, error) {
agent: string(secret.Data[agentPasswordKey]),
replicator: string(secret.Data[replicationPasswordKey]),
donor: string(secret.Data[cloneDonorPasswordKey]),
readOnly: string(secret.Data[ReadOnlyPasswordKey]),
writable: string(secret.Data[WritablePasswordKey]),
readOnly: string(secret.Data[readOnlyPasswordKey]),
writable: string(secret.Data[writablePasswordKey]),
}, nil
}

Expand All @@ -106,8 +106,8 @@ func (p MySQLPassword) ToSecret() *corev1.Secret {
agentPasswordKey: []byte(p.agent),
replicationPasswordKey: []byte(p.replicator),
cloneDonorPasswordKey: []byte(p.donor),
ReadOnlyPasswordKey: []byte(p.readOnly),
WritablePasswordKey: []byte(p.writable),
readOnlyPasswordKey: []byte(p.readOnly),
writablePasswordKey: []byte(p.writable),
},
}
}
Expand All @@ -129,9 +129,9 @@ password="%s"
},
},
Data: map[string][]byte{
constants.AdminMyCnf: formatMyCnf(constants.AdminUser, p.admin),
constants.ReadOnlyMyCnf: formatMyCnf(constants.ReadOnlyUser, p.readOnly),
constants.WritableMyCnf: formatMyCnf(constants.WritableUser, p.writable),
constants.AdminMyCnf: formatMyCnf(constants.AdminUser, p.admin),
},
}
}
Expand Down

0 comments on commit c3b14cb

Please sign in to comment.