Skip to content

Commit

Permalink
[pull] master from arana-db:master (#4)
Browse files Browse the repository at this point in the history
* fix: optimize show open tables xxx statment (arana-db#676) (arana-db#825)

* fix: optimize show open tables xxx statment (arana-db#676)

* fix: convert english comments (arana-db#676)

* fix: fix bug can't match any group (arana-db#676)

* Support dts to replica databases/groups (arana-db#834)

* Support dts to replica databases/groups

* feat: check dts status whether the replication task is finished

* test: misc ut (arana-db#824)

---------

Co-authored-by: null <[email protected]>
Co-authored-by: csynineyang <[email protected]>
Co-authored-by: baerwang <[email protected]>
  • Loading branch information
4 people authored Sep 26, 2024
1 parent a27c47b commit cf1112e
Show file tree
Hide file tree
Showing 13 changed files with 510 additions and 24 deletions.
6 changes: 6 additions & 0 deletions conf/bootstrap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ registry:
options:
endpoints: "http://etcd:2379"

dts:
enable: false
name: dtle
options:
endpoints: "http://dtle:4646"

# name: nacos
# options:
# endpoints: "127.0.0.1:8848"
Expand Down
3 changes: 3 additions & 0 deletions pkg/admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type configWriter interface {
// UpsertCluster upserts a cluster into an existing tenant.
UpsertCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error

// ExtendCluster extends a cluster in an existing tenant.
ExtendCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error

// RemoveCluster removes a cluster from an existing tenant.
RemoveCluster(ctx context.Context, tenant, cluster string) error

Expand Down
250 changes: 250 additions & 0 deletions pkg/admin/config_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package admin

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
)

import (
Expand Down Expand Up @@ -336,6 +341,251 @@ func (cs *MyConfigService) UpsertCluster(ctx context.Context, tenant, cluster st
return nil
}

func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, src, dst string, srcNode, dstNode *NodeDTO, vtables []*TableDTO, idx int) map[string]interface{} {
jobJson := make(map[string]interface{})
jobBody := make(map[string]interface{})
jobJson["Job"] = jobBody

jobId := tenant + "-" + cluster + "-" + src + "-" + dst + "-" + time.Now().Format("20060102150405")
jobBody["ID"] = jobId
jobBody["Datacenters"] = []string{"dc1"}
jobGroups := make([]interface{}, 0, 2)

jobSrc := make(map[string]interface{})
jobSrc["Name"] = "src"

jobTasks := make([]interface{}, 0, 1)
jobTask := make(map[string]interface{})
jobTask["Name"] = "src"
jobTask["Driver"] = "dtle"
jobConfig := make(map[string]interface{})
jobConfig["Gtid"] = ""
jobReplicate := make([]interface{}, 0, 1)
jobDatabase := make(map[string]interface{})
jobDatabase["TableSchema"] = src
jobDatabase["TableSchemaRename"] = dst
jobTables := []map[string]string{}
for i := range vtables {
vTable := vtables[i]
_, _, dbEnd, _ := config.ParseTopology(vTable.Topology.DbPattern)
tbFormat, _, tbEnd, _ := config.ParseTopology(vTable.Topology.TblPattern)
tableNum := int((tbEnd + 1) / (dbEnd + 1))
for j := 0; j < tableNum; j++ {
jobTable := map[string]string{}
jobTable["TableName"] = fmt.Sprintf(tbFormat, idx*tableNum+j)
jobTable["TableRename"] = fmt.Sprintf(tbFormat, idx*tableNum+j+tbEnd+1)
jobTables = append(jobTables, jobTable)
}
}
jobDatabase["Tables"] = jobTables

jobReplicate = append(jobReplicate, jobDatabase)
jobConfig["ReplicateDoDb"] = jobReplicate
jobSrcConfig := make(map[string]interface{})
jobSrcConfig["Host"] = srcNode.Host
jobSrcConfig["Port"] = srcNode.Port
jobSrcConfig["User"] = srcNode.Username
jobSrcConfig["Password"] = srcNode.Password
jobConfig["SrcConnectionConfig"] = jobSrcConfig
jobDstConfig := make(map[string]interface{})
jobDstConfig["Host"] = dstNode.Host
jobDstConfig["Port"] = dstNode.Port
jobDstConfig["User"] = dstNode.Username
jobDstConfig["Password"] = dstNode.Password
jobConfig["DestConnectionConfig"] = jobDstConfig
jobTask["Config"] = jobConfig
jobSrc["Tasks"] = append(jobTasks, jobTask)
jobGroups = append(jobGroups, jobSrc)

jobDst := make(map[string]interface{})
jobDst["Name"] = "dest"
jobTasks = make([]interface{}, 0, 1)
jobTask = make(map[string]interface{})
jobTask["Name"] = "dest"
jobTask["Driver"] = "dtle"
jobTask["Config"] = map[string]string{"DestType": "mysql"}
jobDst["Tasks"] = append(jobTasks, jobTask)
jobGroups = append(jobGroups, jobDst)
jobBody["TaskGroups"] = jobGroups

return jobJson
}

func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error {
//1、校验node和group,保证node和group翻倍(缩容将node和group减半,流程同理)
groups, err := cs.ListDBGroups(ctx, tenant, cluster)
if err != nil {
return err
}
if len(groups) != len(body.Groups) {
return perrors.Errorf("new groups is not equle to old groups")
}
vtables, err := cs.ListTables(ctx, tenant, cluster)
if err != nil {
return err
}
allNodes, err := cs.ListNodes(ctx, tenant)
if err != nil {
return err
}

//2、创建复制group(物理数据库)的dts任务
//groups[0] --> body.Groups[0]
//groups[1] --> body.Groups[1]
//...
httpClient := &http.Client{}
dtsJobList := make([]map[string]interface{}, 0, len(groups))
dtsEndpoint := config.BootOpts.Dts.Options["endpoints"].(string)
for i := range groups {
srcGroup := groups[i].Name
var srcNode, dstNode *NodeDTO
for n := range allNodes {
if allNodes[n].Database == srcGroup {
srcNode = allNodes[n]
}
}
dstGroup := body.Groups[i]
for n := range allNodes {
if allNodes[n].Database == dstGroup {
dstNode = allNodes[n]
}
}
dtsJob := cs.buildDTSJson(ctx, tenant, cluster, srcGroup, dstGroup, srcNode, dstNode, vtables, i)
if dtsJob == nil {
return perrors.Errorf("failed to build DTS json parameter")
}
dtsJobList = append(dtsJobList, dtsJob)
dtsJobJson, _ := json.Marshal(dtsJob)
httpReq, err := http.NewRequest("POST", dtsEndpoint+"/v1/jobs", bytes.NewBuffer(dtsJobJson))
if err != nil {
return perrors.Errorf("failed to create POST http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to start to replica source group")
}
httpResp.Body.Close()
}

//3、检查是否复制完毕
for {
time.Sleep(5 * time.Second)
finished := false

for i := range dtsJobList {
dtsJob := dtsJobList[i]["Job"].(map[string]interface{})
httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string)
httpReq, err := http.NewRequest("GET", httpURL, nil)
if err != nil {
return perrors.Errorf("failed to create GET http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to check replica source group")
}

//TODO: check Status
finished = true
httpResp.Body.Close()
}

if finished {
break
}
}

//4、断开并拒绝所有客户端连接

//5、再次检查是否复制完毕

//6、停止dts任务
for i := range groups {
dtsJob := dtsJobList[i]["Job"].(map[string]interface{})
httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string)
httpReq, err := http.NewRequest("DELETE", httpURL, nil)
if err != nil {
return perrors.Errorf("failed to create DELETE http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to stop to replica source group")
}
httpResp.Body.Close()
}

//7、更新groups节点
var groupBody GroupDTO
var groupNode string
for i := range body.Groups {
groupNode = ""
for n := range allNodes {
if allNodes[n].Database == body.Groups[i] {
groupNode = allNodes[n].Name
}
}
if strings.Compare(groupNode, "") == 0 {
continue
}
groupBody.ClusterName = cluster
groupBody.Name = body.Groups[i]
groupBody.Nodes = []string{groupNode}
err = cs.UpsertGroup(ctx, tenant, cluster, groupBody.Name, &groupBody)
if err != nil {
return err
}
}

//8、更新sharding路由
var tableBody TableDTO
for i := range vtables {
vTable := vtables[i]
_, _, dbEnd, err := config.ParseTopology(vTable.Topology.DbPattern)
if err != nil {
return err
}
_, _, tbEnd, err := config.ParseTopology(vTable.Topology.TblPattern)
if err != nil {
return err
}
dbTotal := 2 * (dbEnd + 1)
tableTotal := 2 * (tbEnd + 1)

tableBody.Name = vTable.Name
tableBody.Sequence = vTable.Sequence
tableBody.DbRules = []*config.Rule{
{
Columns: vTable.DbRules[0].Columns,
Type: vTable.DbRules[0].Type,
Expr: "$0 % " + fmt.Sprintf("%d", tableTotal) + " / " + fmt.Sprintf("%d", dbTotal),
},
}
tableBody.TblRules = []*config.Rule{
{
Columns: vTable.TblRules[0].Columns,
Type: vTable.TblRules[0].Type,
Expr: "$0 % " + fmt.Sprintf("%d", tableTotal),
},
}
tableBody.Topology = &config.Topology{
DbPattern: cluster + fmt.Sprintf("_${0000..%04d}", dbTotal-1),
TblPattern: vTable.Name + fmt.Sprintf("_${0000..%04d}", tableTotal-1),
}
tableBody.ShadowTopology = vTable.ShadowTopology
tableBody.Attributes = vTable.Attributes
err = cs.UpsertTable(ctx, tenant, cluster, tableBody.Name, &tableBody)
if err != nil {
return err
}
}

//9、接受客户端连接

return nil
}

func (cs *MyConfigService) RemoveCluster(ctx context.Context, tenant, cluster string) error {
op, err := cs.getCenter(ctx, tenant)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/admin/router/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func init() {
router.POST("/tenants/:tenant/clusters", CreateCluster)
router.GET("/tenants/:tenant/clusters/:cluster", GetCluster)
router.PUT("/tenants/:tenant/clusters/:cluster", UpdateCluster)
router.POST("/tenants/:tenant/clusters/:cluster", ExtendCluster)
router.DELETE("/tenants/:tenant/clusters/:cluster", RemoveCluster)
})
}
Expand Down Expand Up @@ -107,6 +108,23 @@ func UpdateCluster(c *gin.Context) error {
return nil
}

func ExtendCluster(c *gin.Context) error {
service := admin.GetService(c)
tenant := c.Param("tenant")
cluster := c.Param("cluster")
var clusterBody admin.ClusterDTO
if err := c.ShouldBindJSON(&clusterBody); err != nil {
return exception.Wrap(exception.CodeInvalidParams, err)
}

err := service.ExtendCluster(c, tenant, cluster, &clusterBody)
if err != nil {
return err
}
c.JSON(http.StatusOK, "success")
return nil
}

func RemoveCluster(c *gin.Context) error {
service := admin.GetService(c)
tenant := c.Param("tenant")
Expand Down
5 changes: 4 additions & 1 deletion pkg/config/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/arana-db/arana/pkg/util/log"
)

var BootOpts *BootOptions

// LoadBootOptions loads BootOptions from specified file path.
func LoadBootOptions(path string) (*BootOptions, error) {
content, err := os.ReadFile(path)
Expand All @@ -59,6 +61,7 @@ func LoadBootOptions(path string) (*BootOptions, error) {
return nil, errors.Wrap(err, "failed to validate boot config")
}

BootOpts = &cfg
log.Init(cfg.Logging)
return &cfg, nil
}
Expand All @@ -69,7 +72,7 @@ func LoadTenantOperatorFromPath(path string) (TenantOperator, error) {
if err != nil {
return nil, err
}
if err := Init(*cfg.Config, cfg.Spec.APIVersion); err != nil {
if err = Init(*cfg.Config, cfg.Spec.APIVersion); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit cf1112e

Please sign in to comment.