Skip to content

Commit

Permalink
[dash] add zmq_dpu_proxy_address_base parameter to telemetry.go
Browse files Browse the repository at this point in the history
This allows to configure the gnmi server to send the DASH configuration
to the DashOffloadManager instead of the DPU
  • Loading branch information
Yakiv-Huryk committed Nov 7, 2024
1 parent e844925 commit 56790a0
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 16 deletions.
2 changes: 1 addition & 1 deletion gnmi_server/client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
if origin == "openconfig" {
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{})
} else if IsNativeOrigin(origin) {
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, gnmipb.Encoding_JSON_IETF, "", "")
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, gnmipb.Encoding_JSON_IETF, "", "", "")
} else if len(origin) != 0 {
return grpc.Errorf(codes.Unimplemented, "Unsupported origin: %s", origin)
} else if target == "" {
Expand Down
7 changes: 4 additions & 3 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Config struct {
EnableTranslibWrite bool
EnableNativeWrite bool
ZmqPort string
DpuProxyBaseAddr string
IdleConnDuration int
ConfigTableName string
Vrf string
Expand Down Expand Up @@ -410,7 +411,7 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe
}
}
if check := IsNativeOrigin(origin); check {
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf)
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf, s.config.DpuProxyBaseAddr)
} else {
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions)
}
Expand Down Expand Up @@ -508,7 +509,7 @@ func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetRe
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
return nil, grpc.Errorf(codes.Unimplemented, "GNMI native write is disabled")
}
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf)
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf, s.config.DpuProxyBaseAddr)
} else {
if s.config.EnableTranslibWrite == false {
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
Expand Down Expand Up @@ -585,7 +586,7 @@ func (s *Server) Capabilities(ctx context.Context, req *gnmipb.CapabilityRequest
var supportedModels []gnmipb.ModelData
dc, _ := sdc.NewTranslClient(nil, nil, ctx, extensions)
supportedModels = append(supportedModels, dc.Capabilities()...)
dc, _ = sdc.NewMixedDbClient(nil, nil, "", gnmipb.Encoding_JSON_IETF, s.config.ZmqPort, s.config.Vrf)
dc, _ = sdc.NewMixedDbClient(nil, nil, "", gnmipb.Encoding_JSON_IETF, s.config.ZmqPort, s.config.Vrf, s.config.DpuProxyBaseAddr)
supportedModels = append(supportedModels, dc.Capabilities()...)

suppModels := make([]*gnmipb.ModelData, len(supportedModels))
Expand Down
12 changes: 6 additions & 6 deletions sonic_data_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,17 +756,17 @@ func TestGetDpuAddress(t *testing.T) {
}

// test get ZMQ address
address, err = getZmqAddress("dpu0", "1234")
address, err = getZmqAddress("dpu0", "1234", "")
if address != "tcp://127.0.0.2:1234" {
t.Errorf("get invalid DPU address failed")
}

address, err = getZmqAddress("dpu0", "")
address, err = getZmqAddress("dpu0", "", "")
if err == nil {
t.Errorf("get invalid ZMQ address failed")
}

address, err = getZmqAddress("", "1234")
address, err = getZmqAddress("", "1234", "")
if err == nil {
t.Errorf("get invalid ZMQ address failed")
}
Expand All @@ -793,17 +793,17 @@ func TestGetZmqClient(t *testing.T) {
dpusTable.Hset("dpu0", "midplane_interface", "dpu0")
dhcpPortTable.Hset("bridge-midplane|dpu0", "ips@", "127.0.0.2,127.0.0.1")

client, err := getZmqClient("dpu0", "", "")
client, err := getZmqClient("dpu0", "", "", "")
if client != nil || err != nil {
t.Errorf("empty ZMQ port should not get ZMQ client")
}

client, err = getZmqClient("dpu0", "1234", "")
client, err = getZmqClient("dpu0", "1234", "", "")
if client == nil {
t.Errorf("get ZMQ client failed")
}

client, err = getZmqClient("", "1234", "")
client, err = getZmqClient("", "1234", "", "")
if client == nil {
t.Errorf("get ZMQ client failed")
}
Expand Down
46 changes: 40 additions & 6 deletions sonic_data_client/mixed_db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,47 @@ func getDpuAddress(dpuId string) (string, error) {
return dpuAddressArray[0], nil
}

func getZmqAddress(container string, zmqPort string) (string, error) {
func getDpuProxyAddress(dpuId string, dpuProxyBaseAddr string) (string, error) {
dpuIndexStr := strings.TrimPrefix(dpuId, "dpu")
dpuIndex, err := strconv.Atoi(dpuIndexStr)
if err != nil {
return "", fmt.Errorf("Failed to parse DPU index from %s: %v", dpuId, err)
}

baseIp := net.ParseIP(dpuProxyBaseAddr)
if baseIp == nil {
return "", fmt.Errorf("Invalid DPU proxy base address: %s", dpuProxyBaseAddr)
}

baseIp = baseIp.To4()
if baseIp == nil {
return "", fmt.Errorf("Expecting an IPv4 address for DPU proxy: %s", dpuProxyBaseAddr)
}

lastOctet := int(baseIp[3]) + dpuIndex
if lastOctet > 255 {
return "", fmt.Errorf("DPU index is out of range")
}

baseIp[3] = byte(lastOctet)
return baseIp.String(), nil
}

func getZmqAddress(container string, zmqPort string, dpuProxyBaseAddr string) (string, error) {
// when zmqPort empty, ZMQ feature disabled
if zmqPort == "" {
return "", fmt.Errorf("ZMQ port is empty.")
}

var dpuAddress, err = getDpuAddress(container)
var dpuAddress string
var err error

if dpuProxyBaseAddr != "" {
dpuAddress, err = getDpuProxyAddress(container, dpuProxyBaseAddr)
} else {
dpuAddress, err = getDpuAddress(container)
}

if err != nil {
return "", fmt.Errorf("Get DPU address failed: %v", err)
}
Expand Down Expand Up @@ -181,7 +215,7 @@ func removeZmqClient(zmqClient swsscommon.ZmqClient) (error) {
return fmt.Errorf("Can't find ZMQ client in zmqClientMap: %v", zmqClient)
}

func getZmqClient(dpuId string, zmqPort string, vrf string) (swsscommon.ZmqClient, error) {
func getZmqClient(dpuId string, zmqPort string, vrf string, dpuProxyBaseAddr string) (swsscommon.ZmqClient, error) {
if zmqPort == "" {
// ZMQ feature disabled when zmqPort flag not set
return nil, nil
Expand All @@ -192,7 +226,7 @@ func getZmqClient(dpuId string, zmqPort string, vrf string) (swsscommon.ZmqClien
return getZmqClientByAddress("tcp://" + LOCAL_ADDRESS + ":" + zmqPort, vrf)
}

zmqAddress, err := getZmqAddress(dpuId, zmqPort)
zmqAddress, err := getZmqAddress(dpuId, zmqPort, dpuProxyBaseAddr)
if err != nil {
return nil, fmt.Errorf("Get ZMQ address failed: %v", err)
}
Expand Down Expand Up @@ -493,7 +527,7 @@ func init() {
initRedisDbMap()
}

func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, encoding gnmipb.Encoding, zmqPort string, vrf string) (Client, error) {
func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, encoding gnmipb.Encoding, zmqPort string, vrf string, dpuProxyBaseAddr string) (Client, error) {
var err error

// Initialize RedisDbMap for test
Expand Down Expand Up @@ -556,7 +590,7 @@ func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string,
client.workPath = common_utils.GNMI_WORK_PATH

// continer is DPU ID
client.zmqClient, err = getZmqClient(container, zmqPort, vrf)
client.zmqClient, err = getZmqClient(container, zmqPort, vrf, dpuProxyBaseAddr)
if err != nil {
return nil, fmt.Errorf("Get ZMQ client failed: %v", err)
}
Expand Down
3 changes: 3 additions & 0 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type TelemetryConfig struct {
ConfigTableName *string
ZmqAddress *string
ZmqPort *string
DashProxyAddr *string
Insecure *bool
NoTLS *bool
AllowNoClientCert *bool
Expand Down Expand Up @@ -155,6 +156,7 @@ func setupFlags(fs *flag.FlagSet) (*TelemetryConfig, *gnmi.Config, error) {
ConfigTableName: fs.String("config_table_name", "", "Config table name"),
ZmqAddress: fs.String("zmq_address", "", "Orchagent ZMQ address, deprecated, please use zmq_port."),
ZmqPort: fs.String("zmq_port", "", "Orchagent ZMQ port, when not set or empty string telemetry server will switch to Redis based communication channel."),
DashProxyAddr: fs.String("zmq_dpu_proxy_address_base", "", "Dash offload manager ZMQ base address, when set, the DPU configuration will be send to the proxy address instead of directly to the DPU."),
Insecure: fs.Bool("insecure", false, "Skip providing TLS cert and key, for testing only!"),
NoTLS: fs.Bool("noTLS", false, "disable TLS, for testing only!"),
AllowNoClientCert: fs.Bool("allow_no_client_auth", false, "When set, telemetry server will request but not require a client certificate."),
Expand Down Expand Up @@ -242,6 +244,7 @@ func setupFlags(fs *flag.FlagSet) (*TelemetryConfig, *gnmi.Config, error) {
}

cfg.ZmqPort = zmqPort
cfg.DpuProxyBaseAddr = *telemetryCfg.DashProxyAddr

return telemetryCfg, cfg, nil
}
Expand Down

0 comments on commit 56790a0

Please sign in to comment.