Skip to content

Commit

Permalink
Merge pull request #932 from you-looks-not-tasty/devel-1.9.6-cybershot
Browse files Browse the repository at this point in the history
dpvs-agent dump/launch  services
  • Loading branch information
ywc689 authored Jan 12, 2024
2 parents b7216d1 + 79ca896 commit 351e95d
Show file tree
Hide file tree
Showing 45 changed files with 2,365 additions and 171 deletions.
7 changes: 7 additions & 0 deletions tools/dpvs-agent/cmd/device/put_device_name_addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package device
import (
"github.com/dpvs-agent/pkg/ipc/pool"
"github.com/dpvs-agent/pkg/ipc/types"
"github.com/dpvs-agent/pkg/settings"

apiDevice "github.com/dpvs-agent/restapi/operations/device"

Expand Down Expand Up @@ -52,6 +53,12 @@ func (h *putDeviceAddr) Handle(params apiDevice.PutDeviceNameAddrParams) middlew
if params.Sapool != nil && *params.Sapool {
addr.SetFlags("sapool")
}

if params.Snapshot != nil && *params.Snapshot {
AnnouncePort := settings.ShareSnapshot().NodeSpec.AnnouncePort
AnnouncePort.Dpvs = params.Name
}

// addr.SetValidLft(prarms.Spec.ValidLft)
// addr.SetPreferedLft(prarms.Spec.ValidLft)

Expand Down
51 changes: 32 additions & 19 deletions tools/dpvs-agent/cmd/device/put_device_name_netlink_addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package device

import (
"errors"
"fmt"
"net"
"strings"

"github.com/vishvananda/netlink"

"github.com/dpvs-agent/pkg/ipc/pool"
"github.com/dpvs-agent/pkg/settings"
apiDevice "github.com/dpvs-agent/restapi/operations/device"

"github.com/go-openapi/runtime/middleware"
Expand All @@ -44,43 +46,54 @@ func NewPutDeviceNetlinkAddr(cp *pool.ConnPool, parentLogger hclog.Logger) *putD
// ip addr add 10.0.0.1/32 dev eth0
func (h *putDeviceNetlinkAddr) Handle(params apiDevice.PutDeviceNameNetlinkAddrParams) middleware.Responder {
// h.logger.Info("/v2/device/", params.Name, "/netlink/addr ", params.Spec.Addr)
if err := NetlinkAddrAdd(params.Spec.Addr, params.Name, h.logger); err != nil {
return apiDevice.NewPutDeviceNameNetlinkAddrInternalServerError()
}
if params.Snapshot != nil && *params.Snapshot {
AnnouncePort := settings.ShareSnapshot().NodeSpec.AnnouncePort
AnnouncePort.Switch = params.Name
}
return apiDevice.NewPutDeviceNameNetlinkAddrOK()
}

func NetlinkAddrAdd(addr, device string, logger hclog.Logger) error {
var cidr string
if strings.Count(params.Spec.Addr, "/") == 0 {
ip := net.ParseIP(params.Spec.Addr)
if strings.Count(addr, "/") == 0 {
ip := net.ParseIP(addr)
if ip == nil {
h.logger.Info("Parse IP failed.", "Addr", params.Spec.Addr)
return apiDevice.NewPutDeviceNameNetlinkAddrInternalServerError()
logger.Info("Parse IP failed.", "Addr", addr)
return errors.New("Parse IP Failed.")
}
if ip.To4() != nil {
cidr = params.Spec.Addr + "/32"
cidr = addr + "/32"
} else {
cidr = params.Spec.Addr + "/128"
cidr = addr + "/128"
}
} else {
cidr = params.Spec.Addr
cidr = addr
}

ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
h.logger.Error("Parse CIDR failed.", "cidr", cidr, "Error", err.Error())
return apiDevice.NewPutDeviceNameNetlinkAddrInternalServerError()
logger.Error("Parse CIDR failed.", "cidr", cidr, "Error", err.Error())
return err
}

ipnet.IP = ip
addr := &netlink.Addr{IPNet: ipnet}
netlinkAddr := &netlink.Addr{IPNet: ipnet}

link, err := netlink.LinkByName(params.Name)
link, err := netlink.LinkByName(device)
if err != nil {
h.logger.Error("netlink.LinkByName() failed.", "Device Name", params.Name, "Error", err.Error())
return apiDevice.NewPutDeviceNameNetlinkAddrInternalServerError()
logger.Error("netlink.LinkByName() failed.", "device", device, "Error", err.Error())
return err
}

if err := netlink.AddrAdd(link, addr); err != nil {
h.logger.Error("netlink.AddrAdd() failed.", "Error", err.Error())
return apiDevice.NewPutDeviceNameNetlinkAddrInternalServerError()
if err := netlink.AddrAdd(link, netlinkAddr); err != nil {
logger.Error("netlink.AddrAdd() failed.", "Error", err.Error())
return err
}

cmd := fmt.Sprintf("ip addr add %s dev %s", cidr, params.Name)
h.logger.Info("Device add Addr success.", "cmd", cmd)
return apiDevice.NewPutDeviceNameNetlinkAddrOK()
cmd := fmt.Sprintf("ip addr add %s dev %s", cidr, device)
logger.Info("Device add Addr success.", "cmd", cmd)
return nil
}
85 changes: 80 additions & 5 deletions tools/dpvs-agent/cmd/dpvs-agent-server/api_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package main
import (
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"strings"
"time"

Expand All @@ -28,6 +30,7 @@ import (
"github.com/dpvs-agent/cmd/device"
"github.com/dpvs-agent/cmd/ipvs"
"github.com/dpvs-agent/pkg/ipc/pool"
"github.com/dpvs-agent/pkg/settings"
"github.com/dpvs-agent/restapi"
"github.com/dpvs-agent/restapi/operations"
)
Expand All @@ -37,7 +40,9 @@ var (
)

type DpvsAgentServer struct {
InitMode string `long:"init-mode" description:"load service from network or local config file. the options is [network|local]" default:"network"`
LogDir string `long:"log-dir" description:"default log dir is /var/log/ And log name dpvs-agent.log" default:"/var/log/"`
CacheFile string `long:"cache-file" description:"a file path which used to dump the running dpvs active virtual service. we can load it while init by *local* mode and resume dpvs enviroment. if the file path is not specified, there is named with 'dpvs.cache' and store in 'conf.d' which is a subdir of 'LogDir' point to." default:""`
IpcSocketPath string `long:"ipc-sockopt-path" description:"default ipc socket path /var/run/dpvs.ipc" default:"/var/run/dpvs.ipc"`
restapi.Server
}
Expand Down Expand Up @@ -65,6 +70,64 @@ func unixDialer(ctx context.Context) (net.Conn, error) {
return nil, errors.New("unknown error")
}

func validFile(fileName string) error {
filePath := fileName[:strings.LastIndex(fileName, "/")]
pathInfo, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
err = os.MkdirAll(filePath, os.ModePerm)
if err != nil {
return err
}
return nil
}
return err
}

if !pathInfo.IsDir() {
return errors.New(fmt.Sprintf("%s is file", pathInfo.Name()))
}

fileInfo, err := os.Stat(fileName)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}

if fileInfo.IsDir() {
return errors.New(fmt.Sprintf("%s is dir", fileInfo.Name()))
}

return nil
}

func getFilePath(baseDir, defaultSubdir, targetFile, defaultName string) string {
if len(targetFile) == 0 || strings.EqualFold(targetFile, "/") {
return filepath.Join(baseDir, defaultSubdir, defaultName)
} else {
if strings.HasPrefix(targetFile, "/") {
if strings.HasSuffix(targetFile, "/") {
return filepath.Join(targetFile, defaultName)
} else {
return targetFile
}
} else {
if strings.Count(targetFile, "/") == 0 {
return filepath.Join(baseDir, defaultSubdir, targetFile)
} else {
if strings.HasSuffix(targetFile, "/") {
return filepath.Join(baseDir, targetFile, defaultName)
} else {
return filepath.Join(baseDir, targetFile)
}
}
}
}
return targetFile
}

func (agent *DpvsAgentServer) instantiateAPI(restAPI *operations.DpvsAgentAPI) {
if strings.HasSuffix(agent.IpcSocketPath, ".ipc") {
s, err := os.Stat(agent.IpcSocketPath)
Expand Down Expand Up @@ -93,12 +156,17 @@ func (agent *DpvsAgentServer) instantiateAPI(restAPI *operations.DpvsAgentAPI) {
}
}

sep := "/"
if strings.HasSuffix(logDir, "/") {
sep = ""
cacheFile := getFilePath(logDir, "conf.d", agent.CacheFile, "dpvs.cache")
if err := validFile(cacheFile); err != nil {
panic(err)
}
appConf := settings.ShareAppConfig()
appConf.CacheFile = cacheFile

logFile := strings.Join([]string{logDir, "dpvs-agent.log"}, sep)
logFile := getFilePath(logDir, ".", "", "dpvs-agent.log")
if err := validFile(logFile); err != nil {
panic(err)
}
// logOpt := &hclog.LoggerOptions{Name: logFile}
var logOpt *hclog.LoggerOptions
logFileNamePattern := strings.Join([]string{logFile, "%Y%m%d%H%M"}, "-")
Expand All @@ -109,7 +177,6 @@ func (agent *DpvsAgentServer) instantiateAPI(restAPI *operations.DpvsAgentAPI) {
rotatelogs.WithLinkName(logFile),
rotatelogs.WithRotationTime(logRotationInterval),
)
// f, err := os.Create(logFile)
if err == nil {
logOpt = &hclog.LoggerOptions{Name: logFile, Output: logF}
} else {
Expand All @@ -136,6 +203,7 @@ func (agent *DpvsAgentServer) instantiateAPI(restAPI *operations.DpvsAgentAPI) {
restAPI.VirtualserverPutVsVipPortHandler = ipvs.NewPutVsItem(cp, logger)
restAPI.VirtualserverPutVsVipPortLaddrHandler = ipvs.NewPutVsLaddr(cp, logger)
restAPI.VirtualserverPutVsVipPortRsHandler = ipvs.NewPutVsRs(cp, logger)
restAPI.VirtualserverPutVsVipPortRsHealthHandler = ipvs.NewPutVsRsHealth(cp, logger)
restAPI.VirtualserverPutVsVipPortDenyHandler = ipvs.NewPutVsDeny(cp, logger)
restAPI.VirtualserverPutVsVipPortAllowHandler = ipvs.NewPutVsAllow(cp, logger)

Expand All @@ -161,6 +229,13 @@ func (agent *DpvsAgentServer) instantiateAPI(restAPI *operations.DpvsAgentAPI) {
restAPI.DeviceDeleteDeviceNameRouteHandler = device.NewDelDeviceRoute(cp, logger)
restAPI.DeviceDeleteDeviceNameVlanHandler = device.NewDelDeviceVlan(cp, logger)
restAPI.DeviceDeleteDeviceNameNetlinkAddrHandler = device.NewDelDeviceNetlinkAddr(cp, logger)

switch strings.ToLower(agent.InitMode) {
case "network":
case "local":
agent.Host = "127.0.0.1"
agent.LocalLoad(cp, logger)
}
}

func (agent *DpvsAgentServer) InstantiateServer(api *operations.DpvsAgentAPI) *restapi.Server {
Expand Down
121 changes: 121 additions & 0 deletions tools/dpvs-agent/cmd/dpvs-agent-server/local_init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2023 IQiYi Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"strings"

"github.com/dpvs-agent/cmd/device"
"github.com/dpvs-agent/pkg/ipc/pool"
"github.com/dpvs-agent/pkg/ipc/types"
"github.com/dpvs-agent/pkg/settings"

"github.com/hashicorp/go-hclog"
)

func (agent *DpvsAgentServer) LocalLoad(cp *pool.ConnPool, parentLogger hclog.Logger) error {
var errs []error
logger := hclog.Default().Named("LoadConfigFile")
if parentLogger != nil {
logger = parentLogger.Named("LoadConfigFile")
}

snapshot := settings.ShareSnapshot()
if err := snapshot.LoadFrom(settings.LocalConfigFile(), logger); err != nil {
return err
}

announcePort := snapshot.NodeSpec.AnnouncePort
laddrs := snapshot.NodeSpec.Laddrs

for _, service := range snapshot.Services {
// 1> ipvsadm -A vip:port -s wrr
vs := types.NewVirtualServerSpec()
vs.SetAddr(service.Addr)
vs.SetPort(service.Port)
vs.SetProto(service.Proto)
vs.SetFwmark(service.Fwmark)
vs.SetConnTimeout(service.ConnTimeout)
vs.SetBps(service.Bps)
vs.SetLimitProportion(service.LimitProportion)
vs.SetTimeout(service.Timeout)
vs.SetSchedName(service.SchedName)
flags := strings.ToLower(service.Flags)
if strings.Index(flags, "expirequiescent") != -1 {
vs.SetFlagsExpireQuiescent()
}
if strings.Index(flags, "synproxy") != -1 {
vs.SetFlagsSynProxy()
}
if strings.Index(flags, "conhashbysrcip") != -1 {
vs.SetFlagsHashSrcIP()
}
if strings.Index(flags, "conhashbyquicid") != -1 {
vs.SetFlagsHashQuicID()
}
vs.Add(cp, logger)
// 2> dpip addr add ${vip} dev ${device}
svcAddr := types.NewInetAddrDetail()
svcAddr.SetAddr(service.Addr)
svcAddr.SetIfName(announcePort.Dpvs)
svcAddr.Add(cp, logger)

// 3> ipvsadm -at ${VIPPORT} -r ${RS:PORT} -w ${WEIGHT} -b
rsFront := types.NewRealServerFront()
if err := rsFront.ParseVipPortProto(vs.ID()); err != nil {
errs = append(errs, err)
}
rss := make([]*types.RealServerSpec, len(service.RSs.Items))
for i, rs := range service.RSs.Items {
var fwdmode types.DpvsFwdMode
fwdmode.FromString(rs.Spec.Mode)
rss[i] = types.NewRealServerSpec()
rss[i].SetPort(rs.Spec.Port)
rss[i].SetWeight(uint32(rs.Spec.Weight))
rss[i].SetProto(uint16(service.Proto))
rss[i].SetAddr(rs.Spec.IP)
rss[i].SetFwdMode(fwdmode)
}

rsFront.Update(rss, cp, logger)
// 4> bind laddr with vs (ipvsadm --add-laddr -z ${LADDR} -t ${VIPPORT} -F ${device})
laddr := types.NewLocalAddrFront()
if err := laddr.ParseVipPortProto(vs.ID()); err != nil {
}
lds := make([]*types.LocalAddrDetail, len(laddrs.Items))
for i, lip := range laddrs.Items {
lds[i] = types.NewLocalAddrDetail()
lds[i].SetAddr(lip.Addr)
lds[i].SetIfName(lip.Device)
}
laddr.Add(lds, cp, logger)
// 5> ip addr add ${VIP} dev ${KNIDEVICE(lo?)}
if err := device.NetlinkAddrAdd(service.Addr, announcePort.Switch, logger); err != nil {
logger.Error("add addr", service.Addr, "onto device failed")
errs = append(errs, err)
}
}
// 6> dpip addr add ${LADDR} dev ${device}
for _, lip := range laddrs.Items {
lipAddr := types.NewInetAddrDetail()
lipAddr.SetAddr(lip.Addr)
lipAddr.SetIfName(lip.Device)
lipAddr.SetFlags("sapool")
resultCode := lipAddr.Add(cp, logger)
logger.Info("Add addr to device done.", "Device", lip.Device, "Addr", lip.Addr, "result", resultCode.String())
}

return settings.MergedError(errs)
}
3 changes: 3 additions & 0 deletions tools/dpvs-agent/cmd/ipvs/delete_vs_vip_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ipvs
import (
"github.com/dpvs-agent/pkg/ipc/pool"
"github.com/dpvs-agent/pkg/ipc/types"
"github.com/dpvs-agent/pkg/settings"

apiVs "github.com/dpvs-agent/restapi/operations/virtualserver"

Expand Down Expand Up @@ -47,9 +48,11 @@ func (h *delVsItem) Handle(params apiVs.DeleteVsVipPortParams) middleware.Respon
result := vs.Del(h.connPool, h.logger)
switch result {
case types.EDPVS_OK:
settings.ShareSnapshot().ServiceDel(params.VipPort)
h.logger.Info("Del virtual server success.", "VipPort", params.VipPort)
return apiVs.NewDeleteVsVipPortOK()
case types.EDPVS_NOTEXIST:
settings.ShareSnapshot().ServiceDel(params.VipPort)
h.logger.Warn("Del a not exist virtual server done.", "VipPort", params.VipPort, "result", result.String())
return apiVs.NewDeleteVsVipPortNotFound()
default:
Expand Down
Loading

0 comments on commit 351e95d

Please sign in to comment.