diff --git a/.github/workflows/semgrep.yml b/.github/workflows/semgrep.yml new file mode 100644 index 00000000..975769a5 --- /dev/null +++ b/.github/workflows/semgrep.yml @@ -0,0 +1,22 @@ +name: Semgrep + +on: + pull_request: {} + push: + branches: + - master + - '201[7-9][0-1][0-9]' + - '202[0-9][0-1][0-9]' + +jobs: + semgrep: + if: github.repository_owner == 'sonic-net' + name: Semgrep + runs-on: ubuntu-latest + container: + image: returntocorp/semgrep + steps: + - uses: actions/checkout@v3 + - run: semgrep ci + env: + SEMGREP_RULES: p/default diff --git a/Makefile b/Makefile index e19bb496..ab9578b3 100644 --- a/Makefile +++ b/Makefile @@ -82,17 +82,25 @@ endif swsscommon_wrap: make -C swsscommon -check_gotest: +DBCONFG = $(DBDIR)/database_config.json +ENVFILE = build/test/env.txt +TESTENV = $(shell cat $(ENVFILE)) + +$(DBCONFG): testdata/database_config.json sudo mkdir -p ${DBDIR} sudo cp ./testdata/database_config.json ${DBDIR} - sudo mkdir -p /usr/models/yang || true - sudo find $(MGMT_COMMON_DIR)/models -name '*.yang' -exec cp {} /usr/models/yang/ \; + +$(ENVFILE): + mkdir -p $(@D) + tools/test/env.sh | grep -v DB_CONFIG_PATH | tee $@ + +check_gotest: $(DBCONFG) $(ENVFILE) sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-config.txt -covermode=atomic -v github.com/sonic-net/sonic-gnmi/sonic_db_config - sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-gnmi.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/gnmi_server -coverpkg ../... - sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -coverprofile=coverage-dialcout.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/dialout/dialout_client + sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -race -timeout 20m -coverprofile=coverage-gnmi.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/gnmi_server -coverpkg ../... + sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -coverprofile=coverage-dialcout.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/dialout/dialout_client sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-data.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/sonic_data_client sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-dbus.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/sonic_service_client - sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-translutils.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/transl_utils + sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -race -coverprofile=coverage-translutils.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/transl_utils $(GO) get github.com/axw/gocov/... $(GO) get github.com/AlekSi/gocov-xml $(GO) mod vendor diff --git a/azure-pipelines.yml b/azure-pipelines.yml index afe10128..1fabbae8 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -154,7 +154,7 @@ stages: - script: | pushd sonic-gnmi - make check_gotest + make check_gotest ENABLE_TRANSLIB_WRITE=y displayName: "Test" - publish: $(Build.ArtifactStagingDirectory)/ diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index 85d0d193..6a919460 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -5,11 +5,13 @@ import ( "io" "net" "sync" + "strings" "github.com/Workiva/go-datastructures/queue" log "github.com/golang/glog" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" sdc "github.com/sonic-net/sonic-gnmi/sonic_data_client" gnmipb "github.com/openconfig/gnmi/proto/gnmi" ) @@ -128,23 +130,23 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { return grpc.Errorf(codes.InvalidArgument, "first message must be SubscriptionList: %q", query) } - var target string prefix := c.subscribe.GetPrefix() - if prefix == nil { - return grpc.Errorf(codes.Unimplemented, "No target specified in prefix") - } else { - target = prefix.GetTarget() - // TODO: add data client support for fetching non-db data - if target == "" { - return grpc.Errorf(codes.Unimplemented, "Empty target data not supported yet") - } - } + origin := prefix.GetOrigin() + target := prefix.GetTarget() paths, err := c.populateDbPathSubscrition(c.subscribe) if err != nil { return grpc.Errorf(codes.NotFound, "Invalid subscription path: %v %q", err, query) } + if o, err := ParseOrigin(paths); err != nil { + return err // origin conflict within paths + } else if len(origin) == 0 { + origin = o // Use origin from paths if not given in prefix + } else if len(o) != 0 && o != origin { + return status.Error(codes.InvalidArgument, "Origin conflict between prefix and paths") + } + if connectionKey, valid = connectionManager.Add(c.addr, query.String()); !valid { return grpc.Errorf(codes.Unavailable, "Server connections are at capacity.") } @@ -155,7 +157,18 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { mode := c.subscribe.GetMode() - if target == "OTHERS" { + log.V(3).Infof("mode=%v, origin=%q, target=%q", mode, origin, target) + + if origin == "openconfig" { + dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{}) + } else if len(origin) != 0 { + return grpc.Errorf(codes.Unimplemented, "Unsupported origin: %s", origin) + } else if target == "" { + // This and subsequent conditions handle target based path identification + // when origin == "". As per the spec it should have been treated as "openconfig". + // But we take a deviation and stick to legacy logic for backward compatibility + return grpc.Errorf(codes.Unimplemented, "Empty target data not supported") + } else if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) } else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) { dc, err = sdc.NewEventClient(paths, prefix, c.logLevel) @@ -163,7 +176,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { dc, err = sdc.NewDbClient(paths, prefix) } else { /* For any other target or no target create new Transl Client. */ - dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions) + dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{}) } if err != nil { @@ -195,6 +208,9 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { c.Close() // Wait until all child go routines exited c.w.Wait() + if strings.Contains(err.Error(), "i/o timeout") { + return grpc.Errorf(codes.Internal, "%s", err) + } return grpc.Errorf(codes.InvalidArgument, "%s", err) } diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index 39f57298..d712e363 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -228,7 +228,7 @@ func runTestGet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTa textPbPath string, wantRetCode codes.Code, wantRespVal interface{}, valTest bool) { //var retCodeOk bool // Send request - + t.Helper() var pbPath pb.Path if err := proto.UnmarshalText(textPbPath, &pbPath); err != nil { t.Fatalf("error in unmarshaling path: %v %v", textPbPath, err) @@ -276,6 +276,9 @@ func runTestGet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTa t.Fatalf("error in unmarshaling IETF JSON data to json container: %v", err) } var wantJSONStruct interface{} + if v, ok := wantRespVal.(string); ok { + wantRespVal = []byte(v) + } if err := json.Unmarshal(wantRespVal.([]byte), &wantJSONStruct); err != nil { t.Fatalf("error in unmarshaling IETF JSON data to json container: %v", err) } @@ -302,10 +305,12 @@ type op_t int const ( Delete op_t = 1 Replace op_t = 2 + Update op_t = 3 ) func runTestSet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTarget string, textPbPath string, wantRetCode codes.Code, wantRespVal interface{}, attributeData string, op op_t) { + t.Helper() // Send request var pbPath pb.Path if err := proto.UnmarshalText(textPbPath, &pbPath); err != nil { @@ -313,21 +318,34 @@ func runTestSet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTa } req := &pb.SetRequest{} switch op { - case Replace: + case Replace, Update: prefix := pb.Path{Target: pathTarget} var v *pb.TypedValue v = &pb.TypedValue{ Value: &pb.TypedValue_JsonIetfVal{JsonIetfVal: extractJSON(attributeData)}} + data := []*pb.Update{{Path: &pbPath, Val: v}} req = &pb.SetRequest{ Prefix: &prefix, - Replace: []*pb.Update{&pb.Update{Path: &pbPath, Val: v}}, + } + if op == Replace { + req.Replace = data + } else { + req.Update = data } case Delete: req = &pb.SetRequest{ Delete: []*pb.Path{&pbPath}, } } + + runTestSetRaw(t, ctx, gClient, req, wantRetCode) +} + +func runTestSetRaw(t *testing.T, ctx context.Context, gClient pb.GNMIClient, req *pb.SetRequest, + wantRetCode codes.Code) { + t.Helper() + _, err := gClient.Set(ctx, req) gotRetStatus, ok := status.FromError(err) if !ok { @@ -340,6 +358,26 @@ func runTestSet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTa } } +// pathToPb converts string representation of gnmi path to protobuf format +func pathToPb(s string) string { + p, _ := ygot.StringToStructuredPath(s) + return proto.MarshalTextString(p) +} + +func removeModulePrefixFromPathPb(t *testing.T, s string) string { + t.Helper() + var p pb.Path + if err := proto.UnmarshalText(s, &p); err != nil { + t.Fatalf("error unmarshaling path: %v %v", s, err) + } + for _, ele := range p.Elem { + if k := strings.IndexByte(ele.Name, ':'); k != -1 { + ele.Name = ele.Name[k+1:] + } + } + return proto.MarshalTextString(&p) +} + func runServer(t *testing.T, s *Server) { //t.Log("Starting RPC server on address:", s.Address()) err := s.Serve() // blocks until close @@ -753,8 +791,6 @@ func TestGnmiSet(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - var emptyRespVal interface{} - tds := []struct { desc string pathTarget string @@ -765,29 +801,28 @@ func TestGnmiSet(t *testing.T) { operation op_t valTest bool }{ + { + desc: "Invalid path", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-interfaces:interfaces/interface[name=Ethernet4]/unknown"), + wantRetCode: codes.Unknown, + operation: Delete, + }, { desc: "Set OC Interface MTU", pathTarget: "OC_YANG", - textPbPath: ` - elem: elem: > - `, + textPbPath: pathToPb("openconfig-interfaces:interfaces/interface[name=Ethernet4]/config"), attributeData: "../testdata/set_interface_mtu.json", wantRetCode: codes.OK, - wantRespVal: emptyRespVal, - operation: Replace, - valTest: false, + operation: Update, }, { desc: "Set OC Interface IP", pathTarget: "OC_YANG", - textPbPath: ` - elem: elem: > elem: elem: > - `, + textPbPath: pathToPb("/openconfig-interfaces:interfaces/interface[name=Ethernet4]/subinterfaces/subinterface[index=0]/openconfig-if-ip:ipv4"), attributeData: "../testdata/set_interface_ipv4.json", wantRetCode: codes.OK, - wantRespVal: emptyRespVal, - operation: Replace, - valTest: false, + operation: Update, }, // { // desc: "Check OC Interface values set", @@ -807,19 +842,82 @@ func TestGnmiSet(t *testing.T) { `, attributeData: "", wantRetCode: codes.OK, - wantRespVal: emptyRespVal, operation: Delete, valTest: false, }, + { + desc: "Set OC Interface IPv6 (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/interfaces/interface[name=Ethernet0]/subinterfaces/subinterface[index=0]/ipv6/addresses/address"), + attributeData: `{"address": [{"ip": "150::1","config": {"ip": "150::1","prefix-length": 80}}]}`, + wantRetCode: codes.OK, + operation: Update, + }, + { + desc: "Delete OC Interface IPv6 (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/interfaces/interface[name=Ethernet0]/subinterfaces/subinterface[index=0]/ipv6/addresses/address[ip=150::1]"), + wantRetCode: codes.OK, + operation: Delete, + }, + { + desc: "Create ACL (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/acl/acl-sets/acl-set"), + attributeData: `{"acl-set": [{"name": "A001", "type": "ACL_IPV4", + "config": {"name": "A001", "type": "ACL_IPV4", "description": "hello, world!"}}]}`, + wantRetCode: codes.OK, + operation: Update, + }, + { + desc: "Verify Create ACL", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-acl:acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]/config/description"), + wantRespVal: `{"openconfig-acl:description": "hello, world!"}`, + wantRetCode: codes.OK, + valTest: true, + }, + { + desc: "Replace ACL Description (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]/config/description"), + attributeData: `{"description": "dummy"}`, + wantRetCode: codes.OK, + operation: Replace, + }, + { + desc: "Verify Replace ACL Description", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-acl:acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]/config/description"), + wantRespVal: `{"openconfig-acl:description": "dummy"}`, + wantRetCode: codes.OK, + valTest: true, + }, + { + desc: "Delete ACL", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-acl:acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]"), + wantRetCode: codes.OK, + operation: Delete, + }, + { + desc: "Verify Delete ACL", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-acl:acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]"), + wantRetCode: codes.NotFound, + valTest: true, + }, } for _, td := range tds { if td.valTest == true { - // wait for 2 seconds for change to sync - time.Sleep(2 * time.Second) t.Run(td.desc, func(t *testing.T) { runTestGet(t, ctx, gClient, td.pathTarget, td.textPbPath, td.wantRetCode, td.wantRespVal, td.valTest) }) + t.Run(td.desc + " (unprefixed path)", func(t *testing.T) { + p := removeModulePrefixFromPathPb(t, td.textPbPath) + runTestGet(t, ctx, gClient, td.pathTarget, p, td.wantRetCode, td.wantRespVal, td.valTest) + }) } else { t.Run(td.desc, func(t *testing.T) { runTestSet(t, ctx, gClient, td.pathTarget, td.textPbPath, td.wantRetCode, td.wantRespVal, td.attributeData, td.operation) @@ -2589,7 +2687,9 @@ func TestGNOI(t *testing.T) { t.Fatalf("Invalid System Time %d", resp.Time) } }) + t.Run("SonicShowTechsupport", func(t *testing.T) { + t.Skip("Not supported yet") sc := sgpb.NewSonicServiceClient(conn) rtime := time.Now().AddDate(0, -1, 0) req := &sgpb.TechsupportRequest{ @@ -2624,6 +2724,7 @@ func TestGNOI(t *testing.T) { for _, v := range cfg_data { t.Run("SonicCopyConfig", func(t *testing.T) { + t.Skip("Not supported yet") sc := sgpb.NewSonicServiceClient(conn) req := &sgpb.CopyConfigRequest{ Input: &sgpb.CopyConfigRequest_Input{ @@ -2709,7 +2810,7 @@ func TestBulkSet(t *testing.T) { go runServer(t, s) defer s.s.Stop() - // prepareDb(t) + prepareDbTranslib(t) //t.Log("Start gNMI client") tlsConfig := &tls.Config{InsecureSkipVerify: true} @@ -2726,32 +2827,81 @@ func TestBulkSet(t *testing.T) { gClient := pb.NewGNMIClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + t.Run("Set Multiple mtu", func(t *testing.T) { - pbPath1, _ := xpath.ToGNMIPath("openconfig-interfaces:interfaces/interface[name=Ethernet0]/config/mtu") - v := &pb.TypedValue{ - Value: &pb.TypedValue_JsonIetfVal{JsonIetfVal: []byte("{\"mtu\": 9104}")}} - update1 := &pb.Update{ - Path: pbPath1, - Val: v, - } - pbPath2, _ := xpath.ToGNMIPath("openconfig-interfaces:interfaces/interface[name=Ethernet4]/config/mtu") - v2 := &pb.TypedValue{ - Value: &pb.TypedValue_JsonIetfVal{JsonIetfVal: []byte("{\"mtu\": 9105}")}} - update2 := &pb.Update{ - Path: pbPath2, - Val: v2, - } + req := &pb.SetRequest{ + Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}}, + Update: []*pb.Update{ + newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`), + newPbUpdate("interface[name=Ethernet4]/config/mtu", `{"mtu": 9105}`), + }} + runTestSetRaw(t, ctx, gClient, req, codes.OK) + }) + t.Run("Update and Replace", func(t *testing.T) { + aclKeys := `"name": "A002", "type": "ACL_IPV4"` req := &pb.SetRequest{ - Update: []*pb.Update{update1, update2}, + Replace: []*pb.Update{ + newPbUpdate( + "openconfig-acl:acl/acl-sets/acl-set", + `{"acl-set": [{`+aclKeys+`, "config":{`+aclKeys+`}}]}`), + }, + Update: []*pb.Update{ + newPbUpdate( + "interfaces/interface[name=Ethernet0]/config/description", + `{"description": "Bulk update 1"}`), + newPbUpdate( + "openconfig-interfaces:interfaces/interface[name=Ethernet4]/config/description", + `{"description": "Bulk update 2"}`), + }} + runTestSetRaw(t, ctx, gClient, req, codes.OK) + }) + + aclPath1, _ := ygot.StringToStructuredPath("/acl/acl-sets") + aclPath2, _ := ygot.StringToStructuredPath("/openconfig-acl:acl/acl-sets") + + t.Run("Multiple deletes", func(t *testing.T) { + req := &pb.SetRequest{ + Delete: []*pb.Path{aclPath1, aclPath2}, } + runTestSetRaw(t, ctx, gClient, req, codes.OK) + }) - _, err = gClient.Set(ctx, req) - _, ok := status.FromError(err) - if !ok { - t.Fatal("got a non-grpc error from grpc call") + t.Run("Invalid Update Path", func(t *testing.T) { + req := &pb.SetRequest{ + Delete: []*pb.Path{aclPath1, aclPath2}, + Update: []*pb.Update{ + newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`), + }} + runTestSetRaw(t, ctx, gClient, req, codes.Unknown) + }) + + t.Run("Invalid Replace Path", func(t *testing.T) { + req := &pb.SetRequest{ + Delete: []*pb.Path{aclPath1, aclPath2}, + Replace: []*pb.Update{ + newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`), + }} + runTestSetRaw(t, ctx, gClient, req, codes.Unknown) + }) + + t.Run("Invalid Delete Path", func(t *testing.T) { + req := &pb.SetRequest{ + Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}}, + Delete: []*pb.Path{aclPath1, aclPath2}, } + runTestSetRaw(t, ctx, gClient, req, codes.Unknown) }) + +} + +func newPbUpdate(path, value string) *pb.Update { + p, _ := ygot.StringToStructuredPath(path) + v := &pb.TypedValue_JsonIetfVal{JsonIetfVal: extractJSON(value)} + return &pb.Update{ + Path: p, + Val: &pb.TypedValue{Value: v}, + } } type loginCreds struct { @@ -3173,9 +3323,9 @@ func TestConnectionsKeepAlive(t *testing.T) { } func TestClient(t *testing.T) { - var mutexDeInitDone sync.Mutex - var mutexHBDone sync.Mutex - var mutexIdxDone sync.Mutex + var mutexDeInit sync.RWMutex + var mutexHB sync.RWMutex + var mutexIdx sync.RWMutex // sonic-host:device-test-event is a test event. // Events client will drop it on floor. @@ -3195,45 +3345,51 @@ func TestClient(t *testing.T) { mock1 := gomonkey.ApplyFunc(sdc.C_init_subs, func(use_cache bool) unsafe.Pointer { return nil - }) - defer mock1.Reset() + }) + defer mock1.Reset() mock2 := gomonkey.ApplyFunc(sdc.C_recv_evt, func(h unsafe.Pointer) (int, sdc.Evt_rcvd) { rc := (int)(0) var evt sdc.Evt_rcvd - mutexIdxDone.Lock() - defer mutexIdxDone.Unlock() - if event_index < len(events) { - evt = events[event_index] - event_index++ + mutexIdx.Lock() + current_index := event_index + mutexIdx.Unlock() + if current_index < len(events) { + evt = events[current_index] + mutexIdx.RLock() + event_index = current_index + 1 + mutexIdx.RUnlock() } else { time.Sleep(time.Millisecond * time.Duration(rcv_timeout)) rc = -1 } return rc, evt - }) - defer mock2.Reset() + }) + defer mock2.Reset() mock3 := gomonkey.ApplyFunc(sdc.Set_heartbeat, func(val int) { - mutexHBDone.Lock() - defer mutexHBDone.Unlock() + mutexHB.RLock() heartbeat = val + mutexHB.RUnlock() }) - - defer mock3.Reset() + defer mock3.Reset() mock4 := gomonkey.ApplyFunc(sdc.C_deinit_subs, func(h unsafe.Pointer) { - mutexDeInitDone.Lock() - defer mutexDeInitDone.Unlock() + mutexDeInit.RLock() deinit_done = true + mutexDeInit.RUnlock() }) - - defer mock4.Reset() + defer mock4.Reset() mock5 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Put", func(pq *queue.PriorityQueue, item ...queue.Item) error { return fmt.Errorf("Queue error") }) - defer mock5.Reset() + defer mock5.Reset() + + mock6 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Len", func(pq *queue.PriorityQueue) int { + return 150000 // Max size for pending events in PQ is 102400 + }) + defer mock6.Reset() s := createServer(t, 8081) go runServer(t, s) @@ -3250,6 +3406,10 @@ func TestClient(t *testing.T) { pause int poll int } { + { + desc: "dropped event", + poll: 3, + }, { desc: "queue error", poll: 3, @@ -3261,27 +3421,39 @@ func TestClient(t *testing.T) { } sdc.C_init_subs(true) - var gotNotiMu sync.Mutex + + var mutexNoti sync.RWMutex + for testNum, tt := range tests { - mutexHBDone.Lock() + mutexHB.RLock() heartbeat = 0 - mutexHBDone.Unlock() - mutexIdxDone.Lock() + mutexHB.RUnlock() + + mutexIdx.RLock() event_index = 0 - mutexIdxDone.Unlock() + mutexIdx.RUnlock() + + mutexDeInit.RLock() deinit_done = false + mutexDeInit.RUnlock() + t.Run(tt.desc, func(t *testing.T) { c := client.New() defer c.Close() var gotNoti []string q.NotificationHandler = func(n client.Notification) error { - gotNotiMu.Lock() - defer gotNotiMu.Unlock() if nn, ok := n.(client.Update); ok { nn.TS = time.Unix(0, 200) str := fmt.Sprintf("%v", nn.Val) - gotNoti = append(gotNoti, str) + + mutexNoti.Lock() + currentNoti := gotNoti + mutexNoti.Unlock() + + mutexNoti.RLock() + gotNoti = append(currentNoti, str) + mutexNoti.RUnlock() } return nil } @@ -3295,31 +3467,38 @@ func TestClient(t *testing.T) { time.Sleep(time.Millisecond * 2000) - gotNotiMu.Lock() - // -1 to discount test event, which receiver would drop. - if testNum != 0 { + if testNum > 1 { + mutexNoti.Lock() + // -1 to discount test event, which receiver would drop. if (len(events) - 1) != len(gotNoti) { - fmt.Printf("noti[%d] != events[%d]", len(gotNoti), len(events)-1) + t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)-1) } - mutexHBDone.Lock() + + mutexHB.Lock() if (heartbeat != HEARTBEAT_SET) { t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET) } - mutexHBDone.Unlock() + mutexHB.Unlock() + fmt.Printf("DONE: Expect events:%d - 1 gotNoti=%d\n", len(events), len(gotNoti)) + mutexNoti.Unlock() } - gotNotiMu.Unlock() }) + if testNum == 0 { + mock6.Reset() + } + + if testNum == 1 { mock5.Reset() } time.Sleep(time.Millisecond * 1000) - mutexDeInitDone.Lock() + mutexDeInit.Lock() if deinit_done == false { t.Errorf("Events client deinit *NOT* called.") } - mutexDeInitDone.Unlock() + mutexDeInit.Unlock() // t.Log("END of a TEST") } diff --git a/gnmi_server/transl_sub_test.go b/gnmi_server/transl_sub_test.go new file mode 100644 index 00000000..4998ae22 --- /dev/null +++ b/gnmi_server/transl_sub_test.go @@ -0,0 +1,937 @@ +package gnmi + +import ( + "crypto/tls" + "fmt" + "path/filepath" + "reflect" + "strings" + "testing" + "time" + + "github.com/Azure/sonic-mgmt-common/translib" + "github.com/Workiva/go-datastructures/queue" + "github.com/golang/protobuf/proto" + "github.com/openconfig/gnmi/client" + gnmipath "github.com/openconfig/gnmi/path" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" + extnpb "github.com/openconfig/gnmi/proto/gnmi_ext" + "github.com/openconfig/ygot/ygot" + spb "github.com/sonic-net/sonic-gnmi/proto" + dbconfig "github.com/sonic-net/sonic-gnmi/sonic_db_config" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" +) + +// This file contains subscription test cases for translib managed paths + +const ( + ONCE = gnmipb.SubscriptionList_ONCE + POLL = gnmipb.SubscriptionList_POLL + STREAM = gnmipb.SubscriptionList_STREAM + ON_CHANGE = gnmipb.SubscriptionMode_ON_CHANGE + SAMPLE = gnmipb.SubscriptionMode_SAMPLE + TARGET_DEFINED = gnmipb.SubscriptionMode_TARGET_DEFINED +) + +func TestTranslSubscribe(t *testing.T) { + s := createServer(t, 8081) + go runServer(t, s) + defer s.s.Stop() + + prepareDbTranslib(t) + + t.Run("origin=openconfig", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.OK) + sub.Verify(client.Sync{}) + }) + + t.Run("origin=invalid", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("invalid:/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.Unimplemented) + sub.Verify() + }) + + t.Run("origin=empty,target=empty", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.Unimplemented) + sub.Verify() + }) + + t.Run("origin in path", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("openconfig:/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.OK) + sub.Verify(client.Sync{}) + }) + + t.Run("origin conflict", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("xxx:/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("origin conflict in paths", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("openconfig:/openconfig-acl:acl/acl-sets")}, + {Path: strToPath("closeconfig:/openconfig-interfaces/interfaces")}, + }} + sub := doSubscribe(t, req, codes.Unimplemented) + sub.Verify() + }) + + acl1Path := "/openconfig-acl:acl/acl-sets/acl-set[name=ONE][type=ACL_IPV4]" + acl2Path := "/openconfig-acl:acl/acl-sets/acl-set[name=TWO][type=ACL_IPV4]" + + acl1CreatePb := newPbUpdate("/openconfig-acl:acl/acl-sets/acl-set", + `{"acl-set": [{"name": "ONE", "type": "ACL_IPV4", "config": {"name": "ONE", "type": "ACL_IPV4"}}]}`) + acl2CreatePb := newPbUpdate("/openconfig-acl:acl/acl-sets/acl-set", + `{"acl-set": [{"name": "TWO", "type": "ACL_IPV4", "config": {"name": "TWO", "type": "ACL_IPV4", "description": "foo"}}]}`) + acl2DescUpdatePb := newPbUpdate(acl2Path+"/config/description", `{"description": "new"}`) + + acl1DeletePb := strToPath(acl1Path) + acl2DeletePb := strToPath(acl2Path) + acl2DescDeletePb := strToPath(acl2Path + "/config/description") + aclAllDeletePb := strToPath("/openconfig-acl:acl/acl-sets") + + t.Run("ONCE", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + doSet(t, acl1CreatePb) + + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/openconfig-acl:acl/acl-sets/acl-set")}, + }} + + sub := doSubscribe(t, req, codes.OK) + sub.Verify( + Updated(acl1Path+"/name", "ONE"), + Updated(acl1Path+"/type", "ACL_IPV4"), + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Updated(acl1Path+"/state/name", "ONE"), + Updated(acl1Path+"/state/type", "ACL_IPV4"), + client.Sync{}, + ) + }) + + t.Run("POLL", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + + t.Logf("Start POLL subscription for ACL config container") + req := &gnmipb.SubscriptionList{ + Mode: POLL, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/openconfig-acl:acl/acl-sets/acl-set[name=*][type=*]/config")}, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify empty initial updates") + sub.Verify(client.Sync{}) + + t.Logf("Create ACl1") + time.Sleep(2 * time.Second) + doSet(t, acl1CreatePb) + + t.Logf("Verify poll updates include ACL1 data") + sub.Poll() + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + client.Sync{}, + ) + + t.Logf("Create ACL2") + time.Sleep(2 * time.Second) + doSet(t, acl2CreatePb) + + t.Logf("Verify poll updates include both ACL1 and ACL2 data") + sub.Poll() + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/name", "TWO"), + Updated(acl2Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/description", "foo"), + client.Sync{}, + ) + + t.Logf("Delete ACL2") + time.Sleep(2 * time.Second) + doSet(t, acl2DeletePb) + + t.Logf("Verify poll updates now include ACL1 data only") + sub.Poll() + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + client.Sync{}, + ) + }) + + t.Run("ONCHANGE", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + + t.Logf("Start ON_CHANGE subscription for ACL config container") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/acl-set[name=*][type=*]/config"), Mode: ON_CHANGE}, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify no initial updates") + sub.Verify(client.Sync{}) + + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Verify update notifications for ACL2 data") + sub.Verify( + Updated(acl2Path+"/config/name", "TWO"), + Updated(acl2Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/description", "foo"), + ) + + t.Logf("Create ACL1 and delete description of ACL2") + doSet(t, acl1CreatePb, acl2DescDeletePb) + + t.Logf("Verify delete notification for ACL2 description and updates for ACL1 data") + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Deleted(acl2Path+"/config/description"), + ) + + t.Logf("Delete ACL1 and set description for ACL2") + doSet(t, acl2DescUpdatePb, acl1DeletePb) + + t.Logf("Verify delete for ACL1 and update for ACL2 description") + sub.Verify( + Deleted(acl1Path+"/config"), + Updated(acl2Path+"/config/description", "new"), + ) + }) + + t.Run("ONCHANGE_unsupported", func(t *testing.T) { + t.Logf("Try ON_CHANGE for the top interface list") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/openconfig-interfaces:interfaces/interface[name=*]"), Mode: ON_CHANGE}, + }} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + sampleInterval := 25 * time.Second + + t.Run("SAMPLE", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + t.Logf("Create ACL1") + doSet(t, acl1CreatePb) + + t.Logf("Start SAMPLE subscription for ACL state container.. interval=%v", sampleInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set[name=*][type=*]/state"), + SampleInterval: uint64(sampleInterval.Nanoseconds()), + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates include ACL1 data only") + sub.Verify( + Updated(acl1Path+"/state/name", "ONE"), + Updated(acl1Path+"/state/type", "ACL_IPV4"), + client.Sync{}, + ) + + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Verify updates include both ACL data, for 3 intervals") + for i := 1; i <= 3; i++ { + t.Logf("interval %d", i) + sub.VerifyT(sampleInterval - 3*time.Second) // check no notifications before the interval + sub.Verify( + Updated(acl1Path+"/state/name", "ONE"), + Updated(acl1Path+"/state/type", "ACL_IPV4"), + Updated(acl2Path+"/state/name", "TWO"), + Updated(acl2Path+"/state/type", "ACL_IPV4"), + Updated(acl2Path+"/state/description", "foo"), + ) + } + + t.Logf("Delete ACL1 and description of ACL2") + doSet(t, acl1DeletePb, acl2DescDeletePb) + + t.Logf("Verify next iteration includes deletes and updates (for remaining ACL2 data)") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Deleted(acl1Path+"/state"), + Deleted(acl2Path+"/state/description"), + Updated(acl2Path+"/state/name", "TWO"), + Updated(acl2Path+"/state/type", "ACL_IPV4"), + ) + + t.Logf("Verify next iteration has updates only") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Updated(acl2Path+"/state/name", "TWO"), + Updated(acl2Path+"/state/type", "ACL_IPV4"), + ) + }) + + t.Run("SAMPLE_suppress_redundant", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + t.Logf("Create ACL1 and ACL2") + doSet(t, acl1CreatePb, acl2CreatePb) + + t.Logf("Start SAMPLE subscription for ACL config container.. interval=%v, suppress_redundant=true", sampleInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set[name=*][type=*]/config"), + SampleInterval: uint64(sampleInterval.Nanoseconds()), + SuppressRedundant: true, + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates") + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/name", "TWO"), + Updated(acl2Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/description", "foo"), + client.Sync{}, + ) + + t.Logf("Verify next iteration has no data (due to suppress_redundant)") + sub.VerifyT(sampleInterval + 3*time.Second) + + t.Logf("Delete ACL1 and update ACL2 description") + doSet(t, acl1DeletePb, acl2DescUpdatePb) + + t.Logf("Verify next iteration includes deletes and updates for modified paths only") + sub.VerifyT( + sampleInterval+3*time.Second, + Deleted(acl1Path+"/config"), + Updated(acl2Path+"/config/description", "new"), + ) + + t.Logf("Delete ACL2 description") + doSet(t, acl2DescDeletePb) + + t.Logf("Verify next iteration includes description delete only") + sub.VerifyT( + sampleInterval+3*time.Second, + Deleted(acl2Path+"/config/description"), + ) + + t.Logf("Verify next iteration has no data") + sub.VerifyT(sampleInterval + 3*time.Second) + }) + + t.Run("SAMPLE_leaf", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Start SAMPLE subscription for ACL description.. interval=%v, updates_only=true", sampleInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + UpdatesOnly: true, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set/state/description"), + SampleInterval: uint64(sampleInterval.Nanoseconds()), + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify empty initial updates, due to updates_only") + sub.Verify(client.Sync{}) + + t.Logf("Verify next iteration has the description value") + sub.VerifyT(sampleInterval - 3*time.Second) // check no notifications before the interval + sub.Verify( + Updated(acl2Path+"/state/description", "foo"), + ) + + t.Logf("Update ACL2 description") + doSet(t, acl2DescUpdatePb) + + t.Logf("Verify next iteration has the updated description") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Updated(acl2Path+"/state/description", "new"), + ) + + t.Logf("Delete ACL2") + doSet(t, acl2DeletePb) + + t.Logf("Verify next iteration has delete notification") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Deleted(acl2Path + "/state/description"), + ) + + t.Logf("Verify next iteration has no notifications") + sub.VerifyT(sampleInterval + 3*time.Second) + }) + + t.Run("SAMPLE_invalid_interval", func(t *testing.T) { + t.Logf("Try SAMPLE with 1ms SamplerInterval (too low)") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/openconfig-acl:acl/acl-sets"), + SampleInterval: uint64(time.Millisecond.Nanoseconds()), + }, + }} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("SAMPLE_no_interval", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + + t.Logf("Start SAMPLE subscription for ACL description.. without setting SampleInterval") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set/state/description"), + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify empty initial updates") + sub.Verify(client.Sync{}) + + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Verify updates are received after default interval") + sub.VerifyT( + (translib.MinSubscribeInterval+2)*time.Second, + Updated(acl2Path+"/state/description", "foo"), + ) + }) + + t.Run("TARGETDEFINED", func(t *testing.T) { + t.Logf("Start TARGETDEFINED subscription for interface description, in-pkts and in-octets") + interval := 30 * time.Second + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/interfaces/interface[name=Ethernet0]"), + Subscription: []*gnmipb.Subscription{ + { + Path: strToPath("/state/description"), + Mode: TARGET_DEFINED, + }, { + Path: strToPath("/state/counters/in-pkts"), + Mode: TARGET_DEFINED, + SampleInterval: uint64(interval.Nanoseconds()), + }, { + Path: strToPath("/state/counters/in-octets"), + Mode: TARGET_DEFINED, + SampleInterval: uint64(interval.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates includes all three data") + eth0Path := "/openconfig-interfaces:interfaces/interface[name=Ethernet0]" + sub.Verify( + Updated(eth0Path+"/state/description", ""), + Updated(eth0Path+"/state/counters/in-pkts", uint64(0)), + Updated(eth0Path+"/state/counters/in-octets", uint64(0)), + client.Sync{}, + ) + + next := time.Now().Add(interval) + + t.Logf("Update port description") + updateDb(t, DbDataMap{ + "CONFIG_DB": {"PORT|Ethernet0": {"description": "the one"}}, + "APPL_DB": {"PORT_TABLE:Ethernet0": {"description": "the one"}}, + }) + + t.Logf("Verify update notification for port description") + sub.Verify( + Updated(eth0Path+"/state/description", "the one"), + ) + + t.Logf("Verify periodic updates for stats only") + for i := 1; i <= 2; i++ { + sub.VerifyT(time.Until(next) - 3*time.Second) + sub.Verify( + Updated(eth0Path+"/state/counters/in-pkts", uint64(0)), + Updated(eth0Path+"/state/counters/in-octets", uint64(0)), + ) + next = time.Now().Add(interval) + } + }) + + t.Run("TARGETDEFINED_split", func(t *testing.T) { + interval := 30 * time.Second + eth0State := "/openconfig-interfaces:interfaces/interface[name=Ethernet0]/state" + + t.Logf("Start TARGETDEFINED subscription for interface state container") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{ + Path: strToPath(eth0State), + Mode: TARGET_DEFINED, + SampleInterval: uint64(interval.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates includes nodes from both state and counters containers") + sub.GlobCompare = true + sub.Verify( + Updated(eth0State+"/counters/*", nil), + Updated(eth0State+"/*", nil), + client.Sync{}, + ) + + t.Logf("Verify next updates contains only counters data") + sub.VerifyT(interval - 2*time.Second) + sub.Verify( + Updated(eth0State+"/counters/*", nil), + ) + }) + + t.Run("hearbeat", func(t *testing.T) { + saInterval := 30 * time.Second + hbInterval := saInterval + 10*time.Second + + t.Logf("Start an ON_CHANGE and SAMPLE subscription with heartbeat %v", hbInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-interfaces:interfaces/interface[name=Ethernet0]"), + Subscription: []*gnmipb.Subscription{ + { + Path: strToPath("/config/enabled"), + Mode: SAMPLE, + SuppressRedundant: true, + SampleInterval: uint64(saInterval.Nanoseconds()), + HeartbeatInterval: uint64(hbInterval.Nanoseconds()), + }, { + Path: strToPath("/state/oper-status"), + Mode: ON_CHANGE, + HeartbeatInterval: uint64(hbInterval.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates contains both data") + eth0Path := "/openconfig-interfaces:interfaces/interface[name=Ethernet0]" + sub.Verify( + Updated(eth0Path+"/config/enabled", false), + Updated(eth0Path+"/state/oper-status", "DOWN"), + client.Sync{}, + ) + + t.Logf("Verify updates received only after heartbeat interval") + sub.VerifyT(hbInterval - 2*time.Second) + sub.Verify( + Updated(eth0Path+"/config/enabled", false), + Updated(eth0Path+"/state/oper-status", "DOWN"), + ) + }) + + t.Run("hearbeat_invalid (sample)", func(t *testing.T) { + t.Logf("Try a SAMPLE subscription with 1ms heartbeat") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{ + Path: strToPath("/interfaces/interface/config/mtu"), + Mode: SAMPLE, + SuppressRedundant: true, + HeartbeatInterval: uint64(time.Millisecond.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("hearbeat_invalid (onchange)", func(t *testing.T) { + t.Logf("Try an ON_CHANGE subscription with 1ms heartbeat") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{ + Path: strToPath("/interfaces/interface/config/mtu"), + Mode: ON_CHANGE, + HeartbeatInterval: uint64(time.Millisecond.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("bundle_version_0.0.0", func(t *testing.T) { + t.Logf("Start a subscription with BundleVersion=0.0.0") + req := &gnmipb.SubscribeRequest{ + Request: &gnmipb.SubscribeRequest_Subscribe{ + Subscribe: &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/interfaces/interface[name=Ethernet0]"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/config/mtu"), Mode: ON_CHANGE}, + {Path: strToPath("/state/mtu"), Mode: SAMPLE}, + }}}, + Extension: []*extnpb.Extension{newBundleVersion(t, "0.0.0")}, + } + sub := doSubscribeRaw(t, req, codes.OK) + sub.Verify( + Updated("/openconfig-interfaces:interfaces/interface[name=Ethernet0]/config/mtu", uint64(9100)), + Updated("/openconfig-interfaces:interfaces/interface[name=Ethernet0]/state/mtu", uint64(9100)), + client.Sync{}, + ) + }) + + t.Run("bundle_version_invalid", func(t *testing.T) { + t.Logf("Start POLL subscription with BundleVersion=100.0.0") + req := &gnmipb.SubscribeRequest{ + Request: &gnmipb.SubscribeRequest_Subscribe{ + Subscribe: &gnmipb.SubscriptionList{ + Mode: POLL, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/interfaces/interface[name=Ethernet0]/config/mtu")}, + }}}, + Extension: []*extnpb.Extension{newBundleVersion(t, "100.0.0")}, + } + sub := doSubscribeRaw(t, req, codes.InvalidArgument) + sub.Verify() + }) +} + +func strToPath(s string) *gnmipb.Path { + var origin string + if k := strings.IndexByte(s, ':') + 1; k > 0 && k < len(s) && s[k] == '/' { + origin = s[:k-1] + s = s[k:] + } + p, _ := ygot.StringToStructuredPath(s) + p.Origin = origin + return p +} + +func strToCPath(s string) client.Path { + p := strToPath(s) + return gnmipath.ToStrings(p, false) +} + +func Updated(p string, v interface{}) client.Update { + return client.Update{Path: strToCPath(p), Val: v} +} + +func Deleted(p string) client.Delete { + return client.Delete{Path: strToCPath(p)} +} + +type testSubscriber struct { + t *testing.T + client *client.CacheClient + notiQ *queue.Queue + + GlobCompare bool // treat expected paths as glob patterns in Verify() +} + +func doSubscribe(t *testing.T, subReq *gnmipb.SubscriptionList, exStatus codes.Code) *testSubscriber { + t.Helper() + req := &gnmipb.SubscribeRequest{ + Request: &gnmipb.SubscribeRequest_Subscribe{Subscribe: subReq}} + return doSubscribeRaw(t, req, exStatus) +} + +func doSubscribeRaw(t *testing.T, req *gnmipb.SubscribeRequest, exStatus codes.Code) *testSubscriber { + t.Helper() + q, err := client.NewQuery(req) + if err != nil { + t.Fatalf("NewQuery failed: %v", err) + } + + sub := &testSubscriber{ + t: t, + client: client.New(), + notiQ: queue.New(100), + } + + t.Cleanup(sub.close) + + q.Addrs = []string{"127.0.0.1:8081"} + q.TLS = &tls.Config{InsecureSkipVerify: true} + q.NotificationHandler = func(n client.Notification) error { + //fmt.Printf(">>>> %#v\n", n) + return sub.notiQ.Put(n) + } + + go func() { + err = sub.client.Subscribe(context.Background(), q) + if _, ok := status.FromError(err); !ok || status.Code(err) != exStatus { + msg := fmt.Sprintf("Subscribe failed: expected=%v, received=%v", exStatus, err) + sub.notiQ.Put(client.NewError(msg)) + } else if err != nil { + sub.notiQ.Dispose() // got the expected error.. stop listening immediately + } + }() + + return sub +} + +func (sub *testSubscriber) close() { + if sub != nil { + sub.client.Close() + sub.notiQ.Dispose() + } +} + +func (sub *testSubscriber) Poll() { + if err := sub.client.Poll(); err != nil { + sub.t.Helper() + sub.t.Fatalf("Poll failed: %v", err) + } +} + +func (sub *testSubscriber) Verify(expect ...client.Notification) { + sub.VerifyT(5*time.Second, expect...) +} + +func (sub *testSubscriber) VerifyT(timeout time.Duration, expect ...client.Notification) { + sub.t.Helper() + extra := make([]client.Notification, 0) + matched := make(map[int]client.Notification) + deadine := time.Now().Add(timeout) + + for { + n := sub.nextNoti(deadine) + if n == nil { + break // timeout + } + if err, ok := n.(client.Error); ok { + sub.t.Fatal(err.Error()) + } + + index := -1 + for i, ex := range expect { + if sub.compareNoti(n, ex) { + index = i + break + } + } + if index != -1 { + matched[index] = n + } else { + extra = append(extra, n) + } + if _, ok := n.(client.Sync); ok { + break + } + if !sub.GlobCompare && (len(matched) == len(expect)) { + break + } + } + + // if len(matched) == len(expect) && len(extra) == 0 { + // return + // } + switch { + case len(extra) != 0: // found extra updates + case sub.GlobCompare && len(matched) == 0 && len(expect) != 0: // no glob matches found + case !sub.GlobCompare && len(matched) != len(expect): // wrong number of matches + default: + return + } + + for _, n := range extra { + sub.t.Errorf("unexpected: %#v", n) + } + for i, n := range expect { + if matched[i] == nil { + sub.t.Errorf("missing: %#v", n) + } + } + sub.t.FailNow() +} + +func (sub *testSubscriber) nextNoti(deadline time.Time) client.Notification { + sub.t.Helper() + timeout := time.Until(deadline) + if timeout <= 0 { + return nil + } + n, err := sub.notiQ.Poll(1, timeout) + if err == queue.ErrTimeout || err == queue.ErrDisposed { + return nil + } else if err != nil { + sub.t.Fatalf("Unexpected error while waiting for a notification: %v", err) + } + + switch noti := n[0].(type) { + case client.Update: + noti.TS = time.Time{} + return noti + case client.Delete: + noti.TS = time.Time{} + return noti + case client.Error: + sub.t.Fatalf("Unexpected error notification: %s", noti.Error()) + case client.Connected: + return sub.nextNoti(deadline) + } + + return n[0].(client.Notification) +} + +func (sub *testSubscriber) compareNoti(n, exp client.Notification) bool { + if !sub.GlobCompare { + return reflect.DeepEqual(n, exp) + } + + var path, expPath string + var val, expVal interface{} + switch exp := exp.(type) { + case client.Update: + if u, ok := n.(client.Update); ok { + path, val = pathToString(u.Path), u.Val + expPath, expVal = pathToString(exp.Path), exp.Val + } else { + return false + } + case client.Delete: + if d, ok := n.(client.Delete); ok { + path = pathToString(d.Path) + expPath = pathToString(exp.Path) + } else { + return false + } + default: + return reflect.DeepEqual(n, exp) + } + + if ok, _ := filepath.Match(expPath, path); !ok { + return false + } + return expVal == nil || reflect.DeepEqual(val, expVal) +} + +func doSet(t *testing.T, data ...interface{}) { + t.Helper() + req := &gnmipb.SetRequest{} + for _, v := range data { + switch v := v.(type) { + case *gnmipb.Path: + req.Delete = append(req.Delete, v) + case *gnmipb.Update: + req.Update = append(req.Update, v) + default: + t.Fatalf("Unsupported set value: %T %v", v, v) + } + } + + cred := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithTransportCredentials(cred)) + if err != nil { + t.Fatalf("Could not create client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + defer conn.Close() + + _, err = gnmipb.NewGNMIClient(conn).Set(ctx, req) + if err != nil { + t.Fatalf("Set failed: %v", err) + } +} + +// DbDataMap is a map[DBNAME]map[KEY]map[FIELD]VALUE +type DbDataMap map[string]map[string]map[string]interface{} + +func updateDb(t *testing.T, data DbDataMap) { + t.Helper() + for dbName, tableData := range data { + n := dbconfig.GetDbId(dbName, dbconfig.GetDbDefaultNamespace()) + redis := getRedisClientN(t, n, dbconfig.GetDbDefaultNamespace()) + defer redis.Close() + for key, fields := range tableData { + if fields == nil { + redis.Del(key) + continue + } + + modFields := make(map[string]interface{}) + delFields := make([]string, 0) + for n, v := range fields { + if v == nil { + delFields = append(delFields, n) + } else { + modFields[n] = v + } + } + + if len(modFields) != 0 { + redis.HMSet(key, modFields) + } + if len(delFields) != 0 { + redis.HDel(key, delFields...) + } + } + } +} + +func newBundleVersion(t *testing.T, version string) *extnpb.Extension { + t.Helper() + v, err := proto.Marshal(&spb.BundleVersion{Version: version}) + if err != nil { + t.Fatalf("Invalid version %s; err=%v", version, err) + } + ext := &extnpb.RegisteredExtension{Id: spb.BUNDLE_VERSION_EXT, Msg: v} + return &extnpb.Extension{Ext: &extnpb.Extension_RegisteredExt{RegisteredExt: ext}} +} diff --git a/go.mod b/go.mod index 8bdccb5b..69c0adb2 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/net v0.0.0-20201110031124-69a78807bb2b google.golang.org/grpc v1.33.2 + google.golang.org/protobuf v1.25.0 gopkg.in/yaml.v2 v2.2.8 ) diff --git a/proto/sonic_internal.pb.go b/proto/sonic_internal.pb.go index 9d9edbd9..239d7286 100644 --- a/proto/sonic_internal.pb.go +++ b/proto/sonic_internal.pb.go @@ -1,17 +1,32 @@ +// sonic_internal.proto describes the message format used internally by SONiC + // Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.6.1 // source: sonic_internal.proto package gnmi_sonic -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" -import gnmi "github.com/openconfig/gnmi/proto/gnmi" +import ( + proto "github.com/golang/protobuf/proto" + gnmi "github.com/openconfig/gnmi/proto/gnmi" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 type State int32 @@ -21,111 +36,251 @@ const ( State_RUNNING State = 2 ) -var State_name = map[int32]string{ - 0: "STOPPED", - 1: "INIT", - 2: "RUNNING", -} -var State_value = map[string]int32{ - "STOPPED": 0, - "INIT": 1, - "RUNNING": 2, +// Enum value maps for State. +var ( + State_name = map[int32]string{ + 0: "STOPPED", + 1: "INIT", + 2: "RUNNING", + } + State_value = map[string]int32{ + "STOPPED": 0, + "INIT": 1, + "RUNNING": 2, + } +) + +func (x State) Enum() *State { + p := new(State) + *p = x + return p } func (x State) String() string { - return proto.EnumName(State_name, int32(x)) + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (State) Descriptor() protoreflect.EnumDescriptor { + return file_sonic_internal_proto_enumTypes[0].Descriptor() +} + +func (State) Type() protoreflect.EnumType { + return &file_sonic_internal_proto_enumTypes[0] +} + +func (x State) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use State.Descriptor instead. +func (State) EnumDescriptor() ([]byte, []int) { + return file_sonic_internal_proto_rawDescGZIP(), []int{0} } -func (State) EnumDescriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } // Value is the message that reprents a stream of updates for a given path, used internally. type Value struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + // prefix used with path - Prefix *gnmi.Path `protobuf:"bytes,1,opt,name=prefix" json:"prefix,omitempty"` + Prefix *gnmi.Path `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"` // The device specific, or path corresponding to a value. - Path *gnmi.Path `protobuf:"bytes,2,opt,name=path" json:"path,omitempty"` + Path *gnmi.Path `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` // timestamp for the corresponding value, nanoseconds since epoch. // If timestamp is not set the default will assume to // be the current system time. - Timestamp int64 `protobuf:"varint,3,opt,name=timestamp" json:"timestamp,omitempty"` - Val *gnmi.TypedValue `protobuf:"bytes,4,opt,name=val" json:"val,omitempty"` + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Val *gnmi.TypedValue `protobuf:"bytes,4,opt,name=val,proto3" json:"val,omitempty"` // Indicate target has sent all values associated with the subscription // at least once. - SyncResponse bool `protobuf:"varint,5,opt,name=sync_response,json=syncResponse" json:"sync_response,omitempty"` + SyncResponse bool `protobuf:"varint,5,opt,name=sync_response,json=syncResponse,proto3" json:"sync_response,omitempty"` // fatal error happened. - Fatal string `protobuf:"bytes,6,opt,name=fatal" json:"fatal,omitempty"` + Fatal string `protobuf:"bytes,6,opt,name=fatal,proto3" json:"fatal,omitempty"` + // Notification to be used in place of 1-4 if present + Notification *gnmi.Notification `protobuf:"bytes,7,opt,name=notification,proto3" json:"notification,omitempty"` } -func (m *Value) Reset() { *m = Value{} } -func (m *Value) String() string { return proto.CompactTextString(m) } -func (*Value) ProtoMessage() {} -func (*Value) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } +func (x *Value) Reset() { + *x = Value{} + if protoimpl.UnsafeEnabled { + mi := &file_sonic_internal_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Value) String() string { + return protoimpl.X.MessageStringOf(x) +} -func (m *Value) GetPrefix() *gnmi.Path { - if m != nil { - return m.Prefix +func (*Value) ProtoMessage() {} + +func (x *Value) ProtoReflect() protoreflect.Message { + mi := &file_sonic_internal_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Value.ProtoReflect.Descriptor instead. +func (*Value) Descriptor() ([]byte, []int) { + return file_sonic_internal_proto_rawDescGZIP(), []int{0} +} + +func (x *Value) GetPrefix() *gnmi.Path { + if x != nil { + return x.Prefix } return nil } -func (m *Value) GetPath() *gnmi.Path { - if m != nil { - return m.Path +func (x *Value) GetPath() *gnmi.Path { + if x != nil { + return x.Path } return nil } -func (m *Value) GetTimestamp() int64 { - if m != nil { - return m.Timestamp +func (x *Value) GetTimestamp() int64 { + if x != nil { + return x.Timestamp } return 0 } -func (m *Value) GetVal() *gnmi.TypedValue { - if m != nil { - return m.Val +func (x *Value) GetVal() *gnmi.TypedValue { + if x != nil { + return x.Val } return nil } -func (m *Value) GetSyncResponse() bool { - if m != nil { - return m.SyncResponse +func (x *Value) GetSyncResponse() bool { + if x != nil { + return x.SyncResponse } return false } -func (m *Value) GetFatal() string { - if m != nil { - return m.Fatal +func (x *Value) GetFatal() string { + if x != nil { + return x.Fatal } return "" } -func init() { - proto.RegisterType((*Value)(nil), "gnmi.sonic.Value") - proto.RegisterEnum("gnmi.sonic.State", State_name, State_value) -} - -func init() { proto.RegisterFile("sonic_internal.proto", fileDescriptor1) } - -var fileDescriptor1 = []byte{ - // 269 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x8f, 0x4f, 0x4b, 0xc3, 0x30, - 0x18, 0xc6, 0xcd, 0xfa, 0xc7, 0xed, 0x9d, 0x42, 0x09, 0x3b, 0x04, 0x11, 0x29, 0xf5, 0x52, 0x14, - 0x5a, 0xd1, 0xaf, 0xa0, 0x48, 0x2f, 0xb5, 0x64, 0xd5, 0xeb, 0xc8, 0x6a, 0xda, 0x06, 0xda, 0x24, - 0xb4, 0x99, 0xb8, 0x6f, 0xe8, 0xc7, 0x92, 0xa6, 0x03, 0x0f, 0xde, 0xf2, 0xfc, 0x9e, 0xdf, 0x03, - 0x79, 0x61, 0x33, 0x2a, 0x29, 0xaa, 0x9d, 0x90, 0x86, 0x0f, 0x92, 0x75, 0x89, 0x1e, 0x94, 0x51, - 0x18, 0x1a, 0xd9, 0x8b, 0xc4, 0x56, 0x57, 0x0f, 0x8d, 0x30, 0xed, 0x61, 0x9f, 0x54, 0xaa, 0x4f, - 0x95, 0xe6, 0xb2, 0x52, 0xb2, 0x16, 0x4d, 0x3a, 0x19, 0xa9, 0xb5, 0xe7, 0xa7, 0x5d, 0xd8, 0x1c, - 0xfd, 0x20, 0xf0, 0x3e, 0x58, 0x77, 0xe0, 0x38, 0x02, 0x5f, 0x0f, 0xbc, 0x16, 0xdf, 0x04, 0x85, - 0x28, 0x5e, 0x3f, 0x42, 0x62, 0xb5, 0x82, 0x99, 0x96, 0x9e, 0x1a, 0x7c, 0x03, 0xae, 0x66, 0xa6, - 0x25, 0x8b, 0x7f, 0x86, 0xe5, 0xf8, 0x1a, 0x56, 0x46, 0xf4, 0x7c, 0x34, 0xac, 0xd7, 0xc4, 0x09, - 0x51, 0xec, 0xd0, 0x3f, 0x80, 0x23, 0x70, 0xbe, 0x58, 0x47, 0x5c, 0x3b, 0x0e, 0xe6, 0x71, 0x79, - 0xd4, 0xfc, 0xd3, 0x7e, 0x80, 0x4e, 0x25, 0xbe, 0x85, 0xcb, 0xf1, 0x28, 0xab, 0xdd, 0xc0, 0x47, - 0xad, 0xe4, 0xc8, 0x89, 0x17, 0xa2, 0x78, 0x49, 0x2f, 0x26, 0x48, 0x4f, 0x0c, 0x6f, 0xc0, 0xab, - 0x99, 0x61, 0x1d, 0xf1, 0x43, 0x14, 0xaf, 0xe8, 0x1c, 0xee, 0xee, 0xc1, 0xdb, 0x1a, 0x66, 0x38, - 0x5e, 0xc3, 0xf9, 0xb6, 0x7c, 0x2b, 0x8a, 0x97, 0xe7, 0xe0, 0x0c, 0x2f, 0xc1, 0xcd, 0xf2, 0xac, - 0x0c, 0xd0, 0x84, 0xe9, 0x7b, 0x9e, 0x67, 0xf9, 0x6b, 0xb0, 0xd8, 0xfb, 0xf6, 0xfc, 0xa7, 0xdf, - 0x00, 0x00, 0x00, 0xff, 0xff, 0x14, 0x1d, 0x16, 0xfb, 0x54, 0x01, 0x00, 0x00, +func (x *Value) GetNotification() *gnmi.Notification { + if x != nil { + return x.Notification + } + return nil +} + +var File_sonic_internal_proto protoreflect.FileDescriptor + +var file_sonic_internal_proto_rawDesc = []byte{ + 0x0a, 0x14, 0x73, 0x6f, 0x6e, 0x69, 0x63, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x73, 0x6f, 0x6e, + 0x69, 0x63, 0x1a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, + 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x67, 0x6e, 0x6d, 0x69, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6e, 0x6d, 0x69, 0x2f, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x80, 0x02, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x22, + 0x0a, 0x06, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, + 0x2e, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x52, 0x06, 0x70, 0x72, 0x65, 0x66, + 0x69, 0x78, 0x12, 0x1e, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0a, 0x2e, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x52, 0x04, 0x70, 0x61, + 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x12, 0x22, 0x0a, 0x03, 0x76, 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, + 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, + 0x03, 0x76, 0x61, 0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x73, 0x79, 0x6e, + 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x61, 0x74, + 0x61, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x66, 0x61, 0x74, 0x61, 0x6c, 0x12, + 0x36, 0x0a, 0x0c, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x4e, 0x6f, 0x74, + 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0c, 0x6e, 0x6f, 0x74, 0x69, 0x66, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2a, 0x2b, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x54, 0x4f, 0x50, 0x50, 0x45, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, + 0x04, 0x49, 0x4e, 0x49, 0x54, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, + 0x4e, 0x47, 0x10, 0x02, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_sonic_internal_proto_rawDescOnce sync.Once + file_sonic_internal_proto_rawDescData = file_sonic_internal_proto_rawDesc +) + +func file_sonic_internal_proto_rawDescGZIP() []byte { + file_sonic_internal_proto_rawDescOnce.Do(func() { + file_sonic_internal_proto_rawDescData = protoimpl.X.CompressGZIP(file_sonic_internal_proto_rawDescData) + }) + return file_sonic_internal_proto_rawDescData +} + +var file_sonic_internal_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_sonic_internal_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_sonic_internal_proto_goTypes = []interface{}{ + (State)(0), // 0: gnmi.sonic.State + (*Value)(nil), // 1: gnmi.sonic.Value + (*gnmi.Path)(nil), // 2: gnmi.Path + (*gnmi.TypedValue)(nil), // 3: gnmi.TypedValue + (*gnmi.Notification)(nil), // 4: gnmi.Notification +} +var file_sonic_internal_proto_depIdxs = []int32{ + 2, // 0: gnmi.sonic.Value.prefix:type_name -> gnmi.Path + 2, // 1: gnmi.sonic.Value.path:type_name -> gnmi.Path + 3, // 2: gnmi.sonic.Value.val:type_name -> gnmi.TypedValue + 4, // 3: gnmi.sonic.Value.notification:type_name -> gnmi.Notification + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_sonic_internal_proto_init() } +func file_sonic_internal_proto_init() { + if File_sonic_internal_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_sonic_internal_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Value); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_sonic_internal_proto_rawDesc, + NumEnums: 1, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_sonic_internal_proto_goTypes, + DependencyIndexes: file_sonic_internal_proto_depIdxs, + EnumInfos: file_sonic_internal_proto_enumTypes, + MessageInfos: file_sonic_internal_proto_msgTypes, + }.Build() + File_sonic_internal_proto = out.File + file_sonic_internal_proto_rawDesc = nil + file_sonic_internal_proto_goTypes = nil + file_sonic_internal_proto_depIdxs = nil } diff --git a/proto/sonic_internal.proto b/proto/sonic_internal.proto index bb8b3402..6eef071d 100644 --- a/proto/sonic_internal.proto +++ b/proto/sonic_internal.proto @@ -31,4 +31,7 @@ message Value { // fatal error happened. string fatal = 6; + + // Notification to be used in place of 1-4 if present + gnmi.Notification notification = 7; } \ No newline at end of file diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index 81c977ae..f24db68f 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -79,9 +79,7 @@ var IntervalTicker = func(interval time.Duration) <-chan time.Time { return time.After(interval) } -const maxRetries = 3 -const retryDelay = 50 * time.Millisecond -var MockFail bool = false +var MockFail int = 0 var NeedMock bool = false var intervalTickerMutex sync.Mutex @@ -143,6 +141,13 @@ func (val Value) Compare(other queue.Item) int { return -1 } +func (val Value) GetTimestamp() int64 { + if n := val.GetNotification(); n != nil { + return n.GetTimestamp() + } + return val.Value.GetTimestamp() +} + type DbClient struct { prefix *gnmipb.Path pathG2S map[*gnmipb.Path][]tablePath @@ -369,6 +374,12 @@ func ValToResp(val Value) (*gnmipb.SubscribeResponse, error) { return nil, fmt.Errorf("%s", fatal) } + // In case the client returned a full gnmipb.Notification object + if n := val.GetNotification(); n != nil { + return &gnmipb.SubscribeResponse{ + Response: &gnmipb.SubscribeResponse_Update{Update: n}}, nil + } + return &gnmipb.SubscribeResponse{ Response: &gnmipb.SubscribeResponse_Update{ Update: &gnmipb.Notification{ @@ -738,24 +749,14 @@ func tableData2Msi(tblPath *tablePath, useKey bool, op *string, msi *map[string] if MockFail == 1 { MockFail++ - defaultTimeout := redisDb.DefaultTimeout - fmt.Printf("Default timeout: %v\n", defaultTimeout) - time.Sleep(2 * defaultTimeout) + fmt.Printf("debug sleep for 10s\n") + time.Sleep(30 * time.Second) } for idx, dbkey := range dbkeys { - for i := 0; i < maxRetries; i++ { - fv, err = redisDb.HGetAll(dbkey).Result() - if err != nil { - if i == maxRetries - 1 { - log.V(2).Infof("frank redis HGetAll failed with 3 times retry for %v, dbkey %s, err %v", tblPath, dbkey, err) - return err - } - time.Sleep(retryDelay) - log.V(2).Infof("frank redis HGetAll failed and retry for %v, dbkey %s, err %v", tblPath, dbkey, err) - continue - } else { - break - } + fv, err = redisDb.HGetAll(dbkey).Result() + if err != nil { + log.V(2).Infof("redis HGetAll failed for %v, dbkey %s", tblPath, dbkey) + return err } if tblPath.jsonTableKey != "" { // If jsonTableKey was prepared, use it diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go index 9c387cdb..224ccb4d 100644 --- a/sonic_data_client/events_client.go +++ b/sonic_data_client/events_client.go @@ -72,11 +72,11 @@ type EventClient struct { subs_handle unsafe.Pointer stopped int - stopMutex sync.Mutex + stopMutex sync.RWMutex // Stats counter counters map[string]uint64 - countersMutex sync.Mutex + countersMutex sync.RWMutex last_latencies [LATENCY_LIST_SIZE]uint64 last_latency_index int @@ -184,7 +184,9 @@ func compute_latency(evtc *EventClient) { total += v } } + evtc.countersMutex.RLock() evtc.counters[LATENCY] = (uint64) (total/LATENCY_LIST_SIZE/1000/1000) + evtc.countersMutex.RUnlock() } } @@ -201,18 +203,20 @@ func update_stats(evtc *EventClient) { * This helps add some initial pause before accessing DB * for existing values. */ - evtc.stopMutex.Lock() - defer evtc.stopMutex.Unlock() - for evtc.stopped == 0 { + for !evtc.isStopped() { var val uint64 compute_latency(evtc) + + evtc.countersMutex.Lock() for _, val = range evtc.counters { if val != 0 { break } } + evtc.countersMutex.Unlock() + if val != 0 { break } @@ -221,7 +225,7 @@ func update_stats(evtc *EventClient) { /* Populate counters from DB for cumulative counters. */ - if evtc.stopped == 0 { + if !evtc.isStopped() { ns := sdcfg.GetDbDefaultNamespace() rclient = redis.NewClient(&redis.Options{ @@ -249,15 +253,20 @@ func update_stats(evtc *EventClient) { } /* Main running loop that updates DB */ - for evtc.stopped == 0 { + for !evtc.isStopped() { tmp_counters := make(map[string]uint64) // compute latency compute_latency(evtc) - for key, val := range evtc.counters { + evtc.countersMutex.Lock() + current_counters := evtc.counters + evtc.countersMutex.Unlock() + + for key, val := range current_counters { tmp_counters[key] = val + db_counters[key] } + tmp_counters[DROPPED] += evtc.last_errors if (wr_counters == nil) || !reflect.DeepEqual(tmp_counters, *wr_counters) { @@ -307,7 +316,7 @@ func C_deinit_subs(h unsafe.Pointer) { func get_events(evtc *EventClient) { defer evtc.wg.Done() - str_ptr := C.malloc(C.sizeof_char * C.size_t(EVENT_BUFFSZ)) + str_ptr := C.malloc(C.sizeof_char * C.size_t(EVENT_BUFFSZ)) defer C.free(unsafe.Pointer(str_ptr)) evt_ptr = (*C.event_receive_op_C_t)(C.malloc(C.size_t(unsafe.Sizeof(C.event_receive_op_C_t{})))) @@ -322,9 +331,13 @@ func get_events(evtc *EventClient) { if rc == 0 { evtc.countersMutex.Lock() - evtc.counters[MISSED] += (uint64)(evt.Missed_cnt) + current_missed_cnt := evtc.counters[MISSED] evtc.countersMutex.Unlock() + evtc.countersMutex.RLock() + evtc.counters[MISSED] = current_missed_cnt + (uint64)(evt.Missed_cnt) + evtc.countersMutex.RUnlock() + if !strings.HasPrefix(evt.Event_str, TEST_EVENT) { qlen := evtc.q.Len() @@ -346,13 +359,17 @@ func get_events(evtc *EventClient) { log.V(1).Infof("Invalid event string: %v", evt.Event_str) } } else { - evtc.counters[DROPPED] += 1 + evtc.countersMutex.Lock() + dropped_cnt := evtc.counters[DROPPED] + evtc.countersMutex.Unlock() + + evtc.countersMutex.RLock() + evtc.counters[DROPPED] = dropped_cnt + 1 + evtc.countersMutex.RUnlock() } } } - evtc.stopMutex.Lock() - defer evtc.stopMutex.Unlock() - if evtc.stopped == 1 { + if evtc.isStopped() { break } // TODO: Record missed count in stats table. @@ -362,9 +379,9 @@ func get_events(evtc *EventClient) { C_deinit_subs(evtc.subs_handle) evtc.subs_handle = nil // set evtc.stopped for case where send_event error and channel was not stopped - evtc.stopMutex.Lock() + evtc.stopMutex.RLock() evtc.stopped = 1 - evtc.stopMutex.Unlock() + evtc.stopMutex.RUnlock() } func send_event(evtc *EventClient, tv *gnmipb.TypedValue, @@ -396,18 +413,25 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w go update_stats(evtc) evtc.wg.Add(1) - evtc.stopMutex.Lock() - defer evtc.stopMutex.Unlock() - for evtc.stopped == 0 { + for !evtc.isStopped() { select { case <-evtc.channel: + evtc.stopMutex.RLock() evtc.stopped = 1 + evtc.stopMutex.RUnlock() log.V(3).Infof("Channel closed by client") return } } } +func (evtc *EventClient) isStopped() bool { + evtc.stopMutex.Lock() + val := evtc.stopped + evtc.stopMutex.Unlock() + return val == 1 +} + func (evtc *EventClient) Get(wg *sync.WaitGroup) ([]*spb.Value, error) { return nil, nil diff --git a/sonic_data_client/transl_data_client.go b/sonic_data_client/transl_data_client.go index 08208523..21e63925 100644 --- a/sonic_data_client/transl_data_client.go +++ b/sonic_data_client/transl_data_client.go @@ -1,23 +1,26 @@ -// Package client provides a generic access layer for data available in system +// Package client provides a generic access layer for data available in system package client import ( - spb "github.com/sonic-net/sonic-gnmi/proto" - transutil "github.com/sonic-net/sonic-gnmi/transl_utils" + "context" + "fmt" + "reflect" + "runtime" + "sync" + "time" + + "github.com/Azure/sonic-mgmt-common/translib" + "github.com/Workiva/go-datastructures/queue" log "github.com/golang/glog" "github.com/golang/protobuf/proto" gnmipb "github.com/openconfig/gnmi/proto/gnmi" gnmi_extpb "github.com/openconfig/gnmi/proto/gnmi_ext" - "github.com/Workiva/go-datastructures/queue" - "sync" - "time" - "fmt" - "reflect" - "github.com/Azure/sonic-mgmt-common/translib" + "github.com/openconfig/ygot/ygot" "github.com/sonic-net/sonic-gnmi/common_utils" - "bytes" - "encoding/json" - "context" + spb "github.com/sonic-net/sonic-gnmi/proto" + transutil "github.com/sonic-net/sonic-gnmi/transl_utils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -33,23 +36,34 @@ type TranslClient struct { channel chan struct{} q *queue.PriorityQueue - synced sync.WaitGroup // Control when to send gNMI sync_response - w *sync.WaitGroup // wait for all sub go routines to finish - mu sync.RWMutex // Mutex for data protection among routines for transl_client - ctx context.Context //Contains Auth info and request info + synced sync.WaitGroup // Control when to send gNMI sync_response + w *sync.WaitGroup // wait for all sub go routines to finish + mu sync.RWMutex // Mutex for data protection among routines for transl_client + ctx context.Context //Contains Auth info and request info extensions []*gnmi_extpb.Extension + + version *translib.Version // Client version; populated by parseVersion() + encoding gnmipb.Encoding } -func NewTranslClient(prefix *gnmipb.Path, getpaths []*gnmipb.Path, ctx context.Context, extensions []*gnmi_extpb.Extension) (Client, error) { +func NewTranslClient(prefix *gnmipb.Path, getpaths []*gnmipb.Path, ctx context.Context, extensions []*gnmi_extpb.Extension, opts ...TranslClientOption) (Client, error) { var client TranslClient var err error client.ctx = ctx client.prefix = prefix client.extensions = extensions + if getpaths != nil { + var addWildcardKeys bool + for _, o := range opts { + if _, ok := o.(TranslWildcardOption); ok { + addWildcardKeys = true + } + } + client.path2URI = make(map[*gnmipb.Path]string) /* Populate GNMI path to REST URL map. */ - err = transutil.PopulateClientPaths(prefix, getpaths, &client.path2URI) + err = transutil.PopulateClientPaths(prefix, getpaths, &client.path2URI, addWildcardKeys) } if err != nil { @@ -99,7 +113,6 @@ func (c *TranslClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { func (c *TranslClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { rc, ctx := common_utils.GetContext(c.ctx) c.ctx = ctx - var uri string version := getBundleVersion(c.extensions) if version != nil { rc.BundleVersion = version @@ -109,23 +122,18 @@ func (c *TranslClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, upda return transutil.TranslProcessBulk(delete, replace, update, c.prefix, c.ctx) } else { if len(delete) == 1 { - /* Convert the GNMI Path to URI. */ - transutil.ConvertToURI(c.prefix, delete[0], &uri) - return transutil.TranslProcessDelete(uri, c.ctx) + return transutil.TranslProcessDelete(c.prefix, delete[0], c.ctx) } if len(replace) == 1 { - /* Convert the GNMI Path to URI. */ - transutil.ConvertToURI(c.prefix, replace[0].GetPath(), &uri) - return transutil.TranslProcessReplace(uri, replace[0].GetVal(), c.ctx) + return transutil.TranslProcessReplace(c.prefix, replace[0], c.ctx) } if len(update) == 1 { - /* Convert the GNMI Path to URI. */ - transutil.ConvertToURI(c.prefix, update[0].GetPath(), &uri) - return transutil.TranslProcessUpdate(uri, update[0].GetVal(), c.ctx) + return transutil.TranslProcessUpdate(c.prefix, update[0], c.ctx) } } return nil } + func enqueFatalMsgTranslib(c *TranslClient, msg string) { c.q.Put(Value{ &spb.Value{ @@ -134,375 +142,309 @@ func enqueFatalMsgTranslib(c *TranslClient, msg string) { }, }) } -type ticker_info struct{ - t *time.Ticker - sub *gnmipb.Subscription - heartbeat bool + +func enqueueSyncMessage(c *TranslClient) { + m := &spb.Value{ + Timestamp: time.Now().UnixNano(), + SyncResponse: true, + } + c.q.Put(Value{m}) +} + +// recoverSubscribe recovers from possible panics during subscribe handling. +// It pushes a fatal message to the RPC handler's queue, which forces the server to +// close the RPC with an error status. Should always be used as a deferred function. +func recoverSubscribe(c *TranslClient) { + if r := recover(); r != nil { + buff := make([]byte, 1<<12) + buff = buff[:runtime.Stack(buff, false)] + log.Error(string(buff)) + + err := status.Errorf(codes.Internal, "%v", r) + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) + } +} + +type ticker_info struct { + t *time.Ticker + sub *gnmipb.Subscription + pathStr string + heartbeat bool +} + +func getTranslNotificationType(mode gnmipb.SubscriptionMode) translib.NotificationType { + switch mode { + case gnmipb.SubscriptionMode_ON_CHANGE: + return translib.OnChange + case gnmipb.SubscriptionMode_SAMPLE: + return translib.Sample + default: + return translib.TargetDefined + } +} + +func tickerCleanup(ticker_map map[int][]*ticker_info, c *TranslClient) { + for _, v := range ticker_map { + for _, ti := range v { + ti.t.Stop() + } + } } func (c *TranslClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx c.w = w defer c.w.Done() + defer recoverSubscribe(c) + c.q = q c.channel = stop - version := getBundleVersion(c.extensions) - if version != nil { - rc.BundleVersion = version + c.encoding = subscribe.Encoding + + if err := c.parseVersion(); err != nil { + enqueFatalMsgTranslib(c, err.Error()) + return } ticker_map := make(map[int][]*ticker_info) + + defer tickerCleanup(ticker_map, c) var cases []reflect.SelectCase cases_map := make(map[int]int) var subscribe_mode gnmipb.SubscriptionMode - stringPaths := make([]string, len(subscribe.Subscription)) - for i,sub := range subscribe.Subscription { - stringPaths[i] = c.path2URI[sub.Path] + translPaths := make([]translib.IsSubscribePath, len(subscribe.Subscription)) + sampleCache := make(map[string]*ygotCache) + + for i, sub := range subscribe.Subscription { + translPaths[i].ID = uint32(i) + translPaths[i].Path = c.path2URI[sub.Path] + translPaths[i].Mode = getTranslNotificationType(sub.Mode) + } + + rc, _ := common_utils.GetContext(c.ctx) + ss := translib.NewSubscribeSession() + defer ss.Close() + + req := translib.IsSubscribeRequest{ + Paths: translPaths, + Session: ss, + User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}, } - req := translib.IsSubscribeRequest{Paths:stringPaths} - subSupport,_ := translib.IsSubscribeSupported(req) + if c.version != nil { + req.ClientVersion = *c.version + } + + subSupport, err := translib.IsSubscribeSupported(req) + if err != nil { + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) + return + } + var onChangeSubsString []string - var onChangeSubsgNMI []*gnmipb.Path - onChangeMap := make(map[string]*gnmipb.Path) - valueCache := make(map[string]string) - for i,sub := range subscribe.Subscription { - fmt.Println(sub.Mode, sub.SampleInterval) - switch sub.Mode { + for i, pInfo := range subSupport { + sub := subscribe.Subscription[pInfo.ID] + log.V(6).Infof("Start Sub: %v", sub) + pathStr := pInfo.Path + switch sub.Mode { case gnmipb.SubscriptionMode_TARGET_DEFINED: - - if subSupport[i].Err == nil && subSupport[i].IsOnChangeSupported { - if subSupport[i].PreferredType == translib.Sample { - subscribe_mode = gnmipb.SubscriptionMode_SAMPLE - } else if subSupport[i].PreferredType == translib.OnChange { - subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE - } + if pInfo.IsOnChangeSupported && pInfo.PreferredType == translib.OnChange { + subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE } else { subscribe_mode = gnmipb.SubscriptionMode_SAMPLE } - case gnmipb.SubscriptionMode_ON_CHANGE: - if subSupport[i].Err == nil && subSupport[i].IsOnChangeSupported { - if (subSupport[i].MinInterval > 0) { - subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE - }else{ - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid subscribe path %v", stringPaths[i])) - return - } + if pInfo.IsOnChangeSupported { + subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE } else { - enqueFatalMsgTranslib(c, fmt.Sprintf("ON_CHANGE Streaming mode invalid for %v", stringPaths[i])) + enqueFatalMsgTranslib(c, fmt.Sprintf("ON_CHANGE Streaming mode invalid for %v", pathStr)) return } case gnmipb.SubscriptionMode_SAMPLE: - if (subSupport[i].MinInterval > 0) { - subscribe_mode = gnmipb.SubscriptionMode_SAMPLE - }else{ - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid subscribe path %v", stringPaths[i])) - return - } + subscribe_mode = gnmipb.SubscriptionMode_SAMPLE default: - log.V(1).Infof("Bad Subscription Mode for client %v ", c) enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Subscription Mode %d", sub.Mode)) return } - fmt.Println("subscribe_mode:", subscribe_mode) + + if pInfo.MinInterval <= 0 { // should not happen + pInfo.MinInterval = translib.MinSubscribeInterval + } + + if hb := sub.HeartbeatInterval; hb > 0 && hb < uint64(pInfo.MinInterval)*uint64(time.Second) { + enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Heartbeat Interval %ds, minimum interval is %ds", + sub.HeartbeatInterval/uint64(time.Second), subSupport[i].MinInterval)) + return + } + + log.V(6).Infof("subscribe_mode %v for path %s", subscribe_mode, pathStr) if subscribe_mode == gnmipb.SubscriptionMode_SAMPLE { interval := int(sub.SampleInterval) + minInterval := pInfo.MinInterval * int(time.Second) if interval == 0 { - interval = subSupport[i].MinInterval * int(time.Second) - } else { - if interval < (subSupport[i].MinInterval*int(time.Second)) { - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Sample Interval %ds, minimum interval is %ds", interval/int(time.Second), subSupport[i].MinInterval)) - return - } + interval = minInterval + } else if interval < minInterval { + enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid SampleInterval %ds, minimum interval is %ds", interval/int(time.Second), pInfo.MinInterval)) + return } - if !subscribe.UpdatesOnly { - //Send initial data now so we can send sync response. - val, err := transutil.TranslProcessGet(c.path2URI[sub.Path], nil, c.ctx) - if err != nil { - return - } - spbv := &spb.Value{ - Prefix: c.prefix, - Path: sub.Path, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - c.q.Put(Value{spbv}) - valueCache[c.path2URI[sub.Path]] = string(val.GetJsonIetfVal()) + + reqPath, _ := ygot.StringToStructuredPath(pathStr) + yCache := newYgotCache(reqPath) + sampleCache[pathStr] = yCache + ts := translSubscriber{ + client: c, + session: ss, + sampleCache: yCache, + filterMsgs: subscribe.UpdatesOnly, + } + + // Force ignore init updates for subpaths to prevent duplicates. + // But performs duplicate gets though -- needs optimization. + if pInfo.IsSubPath { + ts.filterMsgs = true } - addTimer(c, ticker_map, &cases, cases_map, interval, sub, false) + // do initial sync & build the cache + ts.doSample(pathStr) + addTimer(c, ticker_map, &cases, cases_map, interval, sub, pathStr, false) //Heartbeat intervals are valid for SAMPLE in the case suppress_redundant is specified if sub.SuppressRedundant && sub.HeartbeatInterval > 0 { - if int(sub.HeartbeatInterval) < subSupport[i].MinInterval * int(time.Second) { - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Heartbeat Interval %ds, minimum interval is %ds", int(sub.HeartbeatInterval)/int(time.Second), subSupport[i].MinInterval)) - return - } - addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, true) + addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, pathStr, true) } } else if subscribe_mode == gnmipb.SubscriptionMode_ON_CHANGE { - onChangeSubsString = append(onChangeSubsString, c.path2URI[sub.Path]) - onChangeSubsgNMI = append(onChangeSubsgNMI, sub.Path) - onChangeMap[c.path2URI[sub.Path]] = sub.Path + onChangeSubsString = append(onChangeSubsString, pathStr) if sub.HeartbeatInterval > 0 { - if int(sub.HeartbeatInterval) < subSupport[i].MinInterval * int(time.Second) { - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Heartbeat Interval %ds, minimum interval is %ds", int(sub.HeartbeatInterval)/int(time.Second), subSupport[i].MinInterval)) - return - } - addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, true) + addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, pathStr, true) } - } + log.V(6).Infof("End Sub: %v", sub) } - if len(onChangeSubsString) > 0 { - c.w.Add(1) - c.synced.Add(1) - go TranslSubscribe(onChangeSubsgNMI, onChangeSubsString, onChangeMap, c, subscribe.UpdatesOnly) + if len(onChangeSubsString) > 0 { + ts := translSubscriber{ + client: c, + session: ss, + filterMsgs: subscribe.UpdatesOnly, + } + ts.doOnChange(onChangeSubsString) + } else { + // If at least one ON_CHANGE subscription was present, then + // ts.doOnChange() would have sent the sync message. + // Explicitly send one here if all are SAMPLE subscriptions. + enqueueSyncMessage(c) } - // Wait until all data values corresponding to the path(s) specified - // in the SubscriptionList has been transmitted at least once - c.synced.Wait() - spbs := &spb.Value{ - Timestamp: time.Now().UnixNano(), - SyncResponse: true, - } - c.q.Put(Value{spbs}) + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.channel)}) for { chosen, _, ok := reflect.Select(cases) - - if !ok { return } - for _,tick := range ticker_map[cases_map[chosen]] { - fmt.Printf("tick, heartbeat: %t, path: %s", tick.heartbeat, c.path2URI[tick.sub.Path]) - val, err := transutil.TranslProcessGet(c.path2URI[tick.sub.Path], nil, c.ctx) - if err != nil { - return - } - spbv := &spb.Value{ - Prefix: c.prefix, - Path: tick.sub.Path, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, + for _, tick := range ticker_map[cases_map[chosen]] { + log.V(6).Infof("tick, heartbeat: %t, path: %s\n", tick.heartbeat, c.path2URI[tick.sub.Path]) + ts := translSubscriber{ + client: c, + session: ss, + sampleCache: sampleCache[tick.pathStr], + filterDups: (!tick.heartbeat && tick.sub.SuppressRedundant), } - - - if (tick.sub.SuppressRedundant) && (!tick.heartbeat) && (string(val.GetJsonIetfVal()) == valueCache[c.path2URI[tick.sub.Path]]) { - log.V(6).Infof("Redundant Message Suppressed #%v", string(val.GetJsonIetfVal())) - } else { - c.q.Put(Value{spbv}) - valueCache[c.path2URI[tick.sub.Path]] = string(val.GetJsonIetfVal()) - log.V(6).Infof("Added spbv #%v", spbv) - } - - + ts.doSample(tick.pathStr) } } } -func addTimer(c *TranslClient, ticker_map map[int][]*ticker_info, cases *[]reflect.SelectCase, cases_map map[int]int, interval int, sub *gnmipb.Subscription, heartbeat bool) { +func addTimer(c *TranslClient, ticker_map map[int][]*ticker_info, cases *[]reflect.SelectCase, cases_map map[int]int, interval int, sub *gnmipb.Subscription, pathStr string, heartbeat bool) { //Reuse ticker for same sample intervals, otherwise create a new one. if ticker_map[interval] == nil { ticker_map[interval] = make([]*ticker_info, 1, 1) - ticker_map[interval][0] = &ticker_info { - t: time.NewTicker(time.Duration(interval) * time.Nanosecond), - sub: sub, + ticker_map[interval][0] = &ticker_info{ + t: time.NewTicker(time.Duration(interval) * time.Nanosecond), + sub: sub, + pathStr: pathStr, heartbeat: heartbeat, } cases_map[len(*cases)] = interval *cases = append(*cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ticker_map[interval][0].t.C)}) - }else { - ticker_map[interval] = append(ticker_map[interval], &ticker_info { - t: ticker_map[interval][0].t, - sub: sub, + } else { + ticker_map[interval] = append(ticker_map[interval], &ticker_info{ + t: ticker_map[interval][0].t, + sub: sub, + pathStr: pathStr, heartbeat: heartbeat, }) } - - -} - -func TranslSubscribe(gnmiPaths []*gnmipb.Path, stringPaths []string, pathMap map[string]*gnmipb.Path, c *TranslClient, updates_only bool) { - defer c.w.Done() - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx - q := queue.NewPriorityQueue(1, false) - var sync_done bool - req := translib.SubscribeRequest{Paths:stringPaths, Q:q, Stop:c.channel} - if rc.BundleVersion != nil { - nver, err := translib.NewVersion(*rc.BundleVersion) - if err != nil { - log.V(2).Infof("Subscribe operation failed with error =%v", err.Error()) - enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) - return - } - req.ClientVersion = nver - } - translib.Subscribe(req) - for { - items, err := q.Get(1) - if err != nil { - log.V(1).Infof("%v", err) - return - } - switch v := items[0].(type) { - case *translib.SubscribeResponse: - - if v.IsTerminated { - //DB Connection or other backend error - enqueFatalMsgTranslib(c, "DB Connection Error") - close(c.channel) - return - } - - var jv []byte - dst := new(bytes.Buffer) - json.Compact(dst, v.Payload) - jv = dst.Bytes() - - /* Fill the values into GNMI data structures . */ - val := &gnmipb.TypedValue{ - Value: &gnmipb.TypedValue_JsonIetfVal{ - JsonIetfVal: jv, - }} - - spbv := &spb.Value{ - Prefix: c.prefix, - Path: pathMap[v.Path], - Timestamp: v.Timestamp, - SyncResponse: false, - Val: val, - } - - //Don't send initial update with full object if user wants updates only. - if updates_only && !sync_done { - log.V(1).Infof("Msg suppressed due to updates_only") - } else { - c.q.Put(Value{spbv}) - } - - log.V(6).Infof("Added spbv #%v", spbv) - - if v.SyncComplete && !sync_done { - fmt.Println("SENDING SYNC") - c.synced.Done() - sync_done = true - } - default: - log.V(1).Infof("Unknown data type %v for %v in queue", items[0], v) - } - } } - - func (c *TranslClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx c.w = w defer c.w.Done() + defer recoverSubscribe(c) c.q = q c.channel = poll - version := getBundleVersion(c.extensions) - if version != nil { - rc.BundleVersion = version + c.encoding = subscribe.Encoding + + if err := c.parseVersion(); err != nil { + enqueFatalMsgTranslib(c, err.Error()) + return } + synced := false for { _, more := <-c.channel if !more { log.V(1).Infof("%v poll channel closed, exiting pollDb routine", c) + enqueFatalMsgTranslib(c, "") return } + t1 := time.Now() - for gnmiPath, URIPath := range c.path2URI { + for _, gnmiPath := range c.path2URI { if synced || !subscribe.UpdatesOnly { - val, err := transutil.TranslProcessGet(URIPath, nil, c.ctx) - if err != nil { - return - } - - spbv := &spb.Value{ - Prefix: c.prefix, - Path: gnmiPath, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - - c.q.Put(Value{spbv}) - log.V(6).Infof("Added spbv #%v", spbv) + ts := translSubscriber{client: c} + ts.doSample(gnmiPath) } } - c.q.Put(Value{ - &spb.Value{ - Timestamp: time.Now().UnixNano(), - SyncResponse: true, - }, - }) + enqueueSyncMessage(c) synced = true log.V(4).Infof("Sync done, poll time taken: %v ms", int64(time.Since(t1)/time.Millisecond)) } } + func (c *TranslClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx c.w = w defer c.w.Done() + defer recoverSubscribe(c) + c.q = q c.channel = once + c.encoding = subscribe.Encoding - version := getBundleVersion(c.extensions) - if version != nil { - rc.BundleVersion = version + if err := c.parseVersion(); err != nil { + enqueFatalMsgTranslib(c, err.Error()) + return } + _, more := <-c.channel if !more { log.V(1).Infof("%v once channel closed, exiting onceDb routine", c) + enqueFatalMsgTranslib(c, "") return } - t1 := time.Now() - for gnmiPath, URIPath := range c.path2URI { - val, err := transutil.TranslProcessGet(URIPath, nil, c.ctx) - if err != nil { - return - } - if !subscribe.UpdatesOnly && val != nil { - spbv := &spb.Value{ - Prefix: c.prefix, - Path: gnmiPath, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - - c.q.Put(Value{spbv}) - log.V(6).Infof("Added spbv #%v", spbv) - } + t1 := time.Now() + for _, gnmiPath := range c.path2URI { + ts := translSubscriber{client: c} + ts.doSample(gnmiPath) } - c.q.Put(Value{ - &spb.Value{ - Timestamp: time.Now().UnixNano(), - SyncResponse: true, - }, - }) + enqueueSyncMessage(c) log.V(4).Infof("Sync done, once time taken: %v ms", int64(time.Since(t1)/time.Millisecond)) - + } func (c *TranslClient) Capabilities() []gnmipb.ModelData { @@ -526,18 +468,39 @@ func (c *TranslClient) SentOne(val *Value) { func (c *TranslClient) FailedSend() { } - func getBundleVersion(extensions []*gnmi_extpb.Extension) *string { - for _,e := range extensions { + for _, e := range extensions { switch v := e.Ext.(type) { - case *gnmi_extpb.Extension_RegisteredExt: - if v.RegisteredExt.Id == spb.BUNDLE_VERSION_EXT { - var bv spb.BundleVersion - proto.Unmarshal(v.RegisteredExt.Msg, &bv) - return &bv.Version - } - + case *gnmi_extpb.Extension_RegisteredExt: + if v.RegisteredExt.Id == spb.BUNDLE_VERSION_EXT { + var bv spb.BundleVersion + proto.Unmarshal(v.RegisteredExt.Msg, &bv) + return &bv.Version + } + } } return nil } + +func (c *TranslClient) parseVersion() error { + bv := getBundleVersion(c.extensions) + if bv == nil { + return nil + } + v, err := translib.NewVersion(*bv) + if err != nil { + c.version = &v + return nil + } + log.V(4).Infof("Failed to parse version \"%s\"; err=%v", *bv, err) + return fmt.Errorf("Invalid bundle version: %v", *bv) +} + +type TranslClientOption interface { + IsTranslClientOption() +} + +type TranslWildcardOption struct{} + +func (t TranslWildcardOption) IsTranslClientOption() {} diff --git a/sonic_data_client/transl_subscriber.go b/sonic_data_client/transl_subscriber.go new file mode 100644 index 00000000..2b11658b --- /dev/null +++ b/sonic_data_client/transl_subscriber.go @@ -0,0 +1,386 @@ +//////////////////////////////////////////////////////////////////////////////// +// // +// Copyright 2021 Broadcom. The term Broadcom refers to Broadcom Inc. and/or // +// its subsidiaries. // +// // +// Licensed under the Apache License, Version 2.0 (the "License"); // +// you may not use this file except in compliance with the License. // +// You may obtain a copy of the License at // +// // +// http://www.apache.org/licenses/LICENSE-2.0 // +// // +// Unless required by applicable law or agreed to in writing, software // +// distributed under the License is distributed on an "AS IS" BASIS, // +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // +// See the License for the specific language governing permissions and // +// limitations under the License. // +// // +//////////////////////////////////////////////////////////////////////////////// + +package client + +import ( + "fmt" + "sync" + "time" + + "github.com/Azure/sonic-mgmt-common/translib" + "github.com/Workiva/go-datastructures/queue" + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" + "github.com/openconfig/ygot/ygot" + spb "github.com/sonic-net/sonic-gnmi/proto" + "github.com/sonic-net/sonic-gnmi/transl_utils" +) + +// translSubscriber is an extension of TranslClient to service Subscribe RPC. +type translSubscriber struct { + client *TranslClient + session *translib.SubscribeSession + sampleCache *ygotCache // session cache for SAMPLE; optional + filterMsgs bool // Filter out messages till sync done (updates_only) + filterDups bool // Filter out duplicate updates (suppress_redundant) + stopOnSync bool // Stop upon sync message from translib + synced sync.WaitGroup // To signal receipt of sync message from translib + rcvdPaths map[string]bool // Paths from received SubscribeResponse + msgBuilder notificationBuilder +} + +// notificationBuilder creates gnmipb.Notification from a translib.SubscribeResponse +// instance. Input can be nil, indicating the end of current sample iteration. +type notificationBuilder func( + resp *translib.SubscribeResponse, ts *translSubscriber) (*gnmipb.Notification, error) + +// doSample invokes translib.Stream API to service SAMPLE, POLL and ONCE subscriptions. +// Timer for SAMPLE subscription should be managed outside. +func (ts *translSubscriber) doSample(path string) { + if ts.sampleCache != nil { + ts.msgBuilder = ts.sampleCache.msgBuilder // SAMPLE + ts.rcvdPaths = make(map[string]bool) + } else { + ts.msgBuilder = defaultMsgBuilder // ONCE, POLL or heartbeat for ON_CHANGE + } + + c := ts.client + req := translib.SubscribeRequest{ + Paths: []string{path}, + Q: queue.NewPriorityQueue(1, false), + Session: ts.session, + } + if c.version != nil { + req.ClientVersion = *c.version + } + + c.w.Add(1) + ts.synced.Add(1) + ts.stopOnSync = true + go ts.processResponses(req.Q) + + err := translib.Stream(req) + if err != nil { + req.Q.Dispose() + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error = %v", err)) + } + + ts.synced.Wait() +} + +// doOnChange handles the ON_CHANGE subscriptions through translib.Subscribe API. +// Returns only after initial updates and sync message are sent to the RPC queue. +func (ts *translSubscriber) doOnChange(stringPaths []string) { + c := ts.client + q := queue.NewPriorityQueue(1, false) + + req := translib.SubscribeRequest{ + Paths: stringPaths, + Q: q, + Stop: c.channel, + Session: ts.session, + } + if c.version != nil { + req.ClientVersion = *c.version + } + + c.w.Add(1) + ts.synced.Add(1) + ts.msgBuilder = defaultMsgBuilder + go ts.processResponses(q) + + err := translib.Subscribe(req) + if err != nil { + q.Dispose() + enqueFatalMsgTranslib(c, "Subscribe operation failed with error: "+err.Error()) + } + + ts.synced.Wait() +} + +// processResponses waits for SubscribeResponse messages from translib over a +// queue, formats them as spb.Value and pushes to the RPC queue. +func (ts *translSubscriber) processResponses(q *queue.PriorityQueue) { + c := ts.client + var syncDone bool + defer c.w.Done() + defer func() { + if !syncDone { + ts.synced.Done() + } + }() + defer recoverSubscribe(c) + + for { + items, err := q.Get(1) + if err == queue.ErrDisposed { + log.V(3).Info("PriorityQueue was disposed!") + return + } + if err != nil { + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) + return + } + switch v := items[0].(type) { + case *translib.SubscribeResponse: + + if v.IsTerminated { + //DB Connection or other backend error + enqueFatalMsgTranslib(c, "DB Connection Error") + close(c.channel) + return + } + + if v.SyncComplete { + if ts.stopOnSync { + ts.notify(nil) + log.V(6).Infof("Stopping on sync signal from translib") + return + } + + log.V(6).Infof("SENDING SYNC") + enqueueSyncMessage(c) + syncDone = true + ts.synced.Done() + ts.filterMsgs = false + break + } + + if err := ts.notify(v); err != nil { + log.Warning(err) + enqueFatalMsgTranslib(c, "Internal error") + return + } + default: + log.V(1).Infof("Unknown data type %T in queue", v) + } + } +} + +func (ts *translSubscriber) notify(v *translib.SubscribeResponse) error { + msg, err := ts.msgBuilder(v, ts) + if err != nil { + return err + } + + if msg == nil || (len(msg.Update) == 0 && len(msg.Delete) == 0) { + log.V(6).Infof("Ignore nil message") + return nil + } + + spbv := &spb.Value{Notification: msg} + ts.client.q.Put(Value{spbv}) + log.V(6).Infof("Added spbv %#v", spbv) + return nil +} + +func (ts *translSubscriber) toPrefix(path string) *gnmipb.Path { + p, _ := ygot.StringToStructuredPath(path) + p.Target = ts.client.prefix.GetTarget() + p.Origin = ts.client.prefix.GetOrigin() + return p +} + +func defaultMsgBuilder(v *translib.SubscribeResponse, ts *translSubscriber) (*gnmipb.Notification, error) { + if v == nil { + return nil, nil + } + if ts.filterMsgs { + log.V(2).Infof("Msg suppressed due to updates_only") + return nil, nil + } + + p := ts.toPrefix(v.Path) + n := gnmipb.Notification{ + Prefix: p, + Timestamp: v.Timestamp, + } + + // Move last elem of v.Path to updates & deletes if v.Delete contains + // an empty path (to indicate the v.Path itself was deleted). + var extraPrefix *gnmipb.PathElem + if strSliceContains(v.Delete, "") { + extraPrefix = removeLastPathElem(p) + } + + if v.Update != nil { + var err error + n.Update, err = ts.ygotToScalarValues(extraPrefix, v.Update) + if err != nil { + return nil, err + } + } + + for _, del := range v.Delete { + p, err := ygot.StringToStructuredPath(del) + if err != nil { + return nil, err + } + insertFirstPathElem(p, extraPrefix) + n.Delete = append(n.Delete, p) + } + + return &n, nil +} + +// ygotToScalarValues returns scalar encoded values for a ygot object. +// If prefixElem is provided, it will be prefixed to each value's path. +func (ts *translSubscriber) ygotToScalarValues(prefixElem *gnmipb.PathElem, obj ygot.ValidatedGoStruct) ([]*gnmipb.Update, error) { + tmp, err := ygot.TogNMINotifications(obj, 0, + ygot.GNMINotificationsConfig{ + UsePathElem: true, + PathElemPrefix: nil, + Encoding: ts.client.encoding, + }) + if err != nil { + return nil, err + } + + updates := tmp[0].Update + if prefixElem != nil { + for _, u := range updates { + insertFirstPathElem(u.Path, prefixElem) + } + } + + return updates, nil +} + +// insertFirstPathElem inserts newElem at the beginning of path p. +func insertFirstPathElem(p *gnmipb.Path, newElem *gnmipb.PathElem) { + if newElem != nil { + ne := make([]*gnmipb.PathElem, 0, len(p.Elem)+1) + ne = append(ne, newElem) + p.Elem = append(ne, p.Elem...) + } +} + +// removeLastPathElem removes the last PathElem from the path p. +// Returns the removed element. +func removeLastPathElem(p *gnmipb.Path) *gnmipb.PathElem { + k := len(p.Elem) - 1 + if k < 0 { + return nil + } + if p.Element != nil { + p.Element = p.Element[:k] + } + last := p.Elem[k] + p.Elem = p.Elem[:k] + return last +} + +func strSliceContains(ss []string, v string) bool { + for _, s := range ss { + if s == v { + return true + } + } + return false +} + +// ygotCache holds path to ygot struct mappings +type ygotCache struct { + values map[string]ygot.GoStruct + pattern *gnmipb.Path // Prefix pattern for the cache keys +} + +// newYgotCache creates a new ygotCache instance +func newYgotCache(pattern *gnmipb.Path) *ygotCache { + return &ygotCache{ + values: make(map[string]ygot.GoStruct), + pattern: pattern, + } +} + +// msgBuilder is a notificationBuilder implementation to create a gnmipb.Notification +// message by comparing the SubscribeResponse.Update ygot struct to the cached value. +// Includes only modified or deleted leaf paths if translSubscriber.filterDups is set. +// Returns nil message if translSubscriber.filterMsgs is set or on error. +// Updates the cache with the new ygot struct (SubscribeResponse.Update). +// Special case: if SubscribeResponse is nil, calls deleteMsgBuilder to delete +// non-existing paths from the cache. +func (c *ygotCache) msgBuilder(v *translib.SubscribeResponse, ts *translSubscriber) (*gnmipb.Notification, error) { + if v == nil { + return c.deleteMsgBuilder(ts) + } + + old := c.values[v.Path] + c.values[v.Path] = v.Update + ts.rcvdPaths[v.Path] = true + log.V(2).Infof("%s updated; old=%p, new=%p, filterDups=%v", v.Path, old, v.Update, ts.filterDups) + if ts.filterMsgs { + log.V(2).Infof("Msg suppressed due to updates_only") + return nil, nil + } + + res, err := transl_utils.Diff(old, v.Update, + transl_utils.DiffOptions{ + RecordAll: !ts.filterDups, + }) + if err != nil { + return nil, err + } + + return &gnmipb.Notification{ + Timestamp: v.Timestamp, + Prefix: ts.toPrefix(v.Path), + Update: res.Update, + Delete: res.Delete, + }, nil +} + +// deleteMsgBuilder deletes the cache entries whose path does not appear in +// the translSubscriber.rcvdPaths map. Creates a gnmipb.Notification message +// for the deleted paths. Returns nil message if there are no such delete paths +// or translSubscriber.filterMsgs is set. +func (c *ygotCache) deleteMsgBuilder(ts *translSubscriber) (*gnmipb.Notification, error) { + if ts.filterMsgs { + log.V(2).Infof("Msg suppressed due to updates_only") + return nil, nil + } + var deletePaths []*gnmipb.Path + for path := range c.values { + if !ts.rcvdPaths[path] { + log.V(3).Infof("%s deleted", path) + deletePaths = append(deletePaths, c.toDeletePath(path)) + delete(c.values, path) + } + } + if len(deletePaths) == 0 { + return nil, nil + } + return &gnmipb.Notification{ + Timestamp: time.Now().UnixNano(), + Prefix: ts.toPrefix("/"), + Delete: deletePaths, + }, nil +} + +func (c *ygotCache) toDeletePath(path string) *gnmipb.Path { + p, _ := ygot.StringToStructuredPath(path) + // p will be parent container path when subscribing to a leaf. + // Append the leaf suffix to p if p is shorter than subscribe path. + if n := len(p.Elem); n < len(c.pattern.Elem) { + suffix := c.pattern.Elem[n:] + p.Elem = append(p.Elem, suffix...) + } + return p +} diff --git a/tools/test/env.sh b/tools/test/env.sh index 46209f14..a60b947e 100755 --- a/tools/test/env.sh +++ b/tools/test/env.sh @@ -2,6 +2,8 @@ set -e -. $(dirname ${BASH_SOURCE})/../../../sonic-mgmt-common/tools/test/env.sh \ +TOPDIR=$(realpath $(dirname ${BASH_SOURCE})/../..) + +. ${TOPDIR}/../sonic-mgmt-common/tools/test/env.sh \ --dest=${TOPDIR}/build/test \ --dbconfig-in=${TOPDIR}/testdata/database_config.json \ diff --git a/transl_utils/transl_utils.go b/transl_utils/transl_utils.go index b6b33857..2160fdbf 100644 --- a/transl_utils/transl_utils.go +++ b/transl_utils/transl_utils.go @@ -2,17 +2,19 @@ package transl_utils import ( "bytes" + "context" "encoding/json" - "strings" "fmt" + "log/syslog" + "strings" + + "github.com/Azure/sonic-mgmt-common/translib" + pathutil "github.com/Azure/sonic-mgmt-common/translib/path" + "github.com/Azure/sonic-mgmt-common/translib/tlerr" log "github.com/golang/glog" gnmipb "github.com/openconfig/gnmi/proto/gnmi" - "github.com/Azure/sonic-mgmt-common/translib" + "github.com/openconfig/ygot/ygot" "github.com/sonic-net/sonic-gnmi/common_utils" - "context" - "log/syslog" - "github.com/Azure/sonic-mgmt-common/translib/tlerr" - ) var ( @@ -58,55 +60,40 @@ func GnmiTranslFullPath(prefix, path *gnmipb.Path) *gnmipb.Path { } /* Populate the URI path corresponding GNMI paths. */ -func PopulateClientPaths(prefix *gnmipb.Path, paths []*gnmipb.Path, path2URI *map[*gnmipb.Path]string) error { - var req string - - /* Fetch the URI for each GET URI. */ +func PopulateClientPaths(prefix *gnmipb.Path, paths []*gnmipb.Path, path2URI *map[*gnmipb.Path]string, addWildcardKeys bool) error { + opts := []pathutil.PathValidatorOpt{ + &pathutil.AppendModulePrefix{}, + } + if addWildcardKeys { + opts = append(opts, &pathutil.AddWildcardKeys{}) + } for _, path := range paths { - ConvertToURI(prefix, path, &req) + req, err := ConvertToURI(prefix, path, opts...) + if err != nil { + return err + } (*path2URI)[path] = req } return nil } -/* Populate the URI path corresponding each GNMI paths. */ -func ConvertToURI(prefix *gnmipb.Path, path *gnmipb.Path, req *string) error { +// ConvertToURI returns translib path for a gnmi Path +func ConvertToURI(prefix, path *gnmipb.Path, opts ...pathutil.PathValidatorOpt) (string, error) { fullPath := path if prefix != nil { fullPath = GnmiTranslFullPath(prefix, path) } - elems := fullPath.GetElem() - *req = "/" - - if elems != nil { - /* Iterate through elements. */ - for i, elem := range elems { - log.V(6).Infof("index %d elem : %#v %#v", i, elem.GetName(), elem.GetKey()) - *req += elem.GetName() - key := elem.GetKey() - /* If no keys are present end the element with "/" */ - if key == nil { - *req += "/" - } - - /* If keys are present , process the keys. */ - if key != nil { - for k, v := range key { - log.V(6).Infof("elem : %#v %#v", k, v) - *req += "[" + k + "=" + v + "]" - } - - /* Append "/" after all keys are processed. */ - *req += "/" - } - } + if len(opts) == 0 { + opts = append(opts, &pathutil.AppendModulePrefix{}) + } + pv := pathutil.NewPathValidator(opts...) + if err := pv.Validate(fullPath); err != nil { + return "", err } - /* Trim the "/" at the end which is not required. */ - *req = strings.TrimSuffix(*req, "/") - return nil + return ygot.PathToString(fullPath) } /* Fill the values from TransLib. */ @@ -150,11 +137,14 @@ func TranslProcessGet(uriPath string, op *string, ctx context.Context) (*gnmipb. } /* Delete request handling. */ -func TranslProcessDelete(uri string, ctx context.Context) error { - var str3 string - payload := []byte(str3) +func TranslProcessDelete(prefix, delPath *gnmipb.Path, ctx context.Context) error { + uri, err := ConvertToURI(prefix, delPath) + if err != nil { + return err + } + rc, _ := common_utils.GetContext(ctx) - req := translib.SetRequest{Path:uri, Payload:payload, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}} + req := translib.SetRequest{Path:uri, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}} if rc.BundleVersion != nil { nver, err := translib.NewVersion(*rc.BundleVersion) if err != nil { @@ -176,13 +166,13 @@ func TranslProcessDelete(uri string, ctx context.Context) error { } /* Replace request handling. */ -func TranslProcessReplace(uri string, t *gnmipb.TypedValue, ctx context.Context) error { - /* Form the CURL request and send to client . */ - str := string(t.GetJsonIetfVal()) - str3 := strings.Replace(str, "\n", "", -1) - log.V(2).Info("Incoming JSON body is", str) +func TranslProcessReplace(prefix *gnmipb.Path, entry *gnmipb.Update, ctx context.Context) error { + uri, err := ConvertToURI(prefix, entry.GetPath()) + if err != nil { + return err + } - payload := []byte(str3) + payload := entry.GetVal().GetJsonIetfVal() rc, _ := common_utils.GetContext(ctx) req := translib.SetRequest{Path:uri, Payload:payload, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}} if rc.BundleVersion != nil { @@ -208,13 +198,13 @@ func TranslProcessReplace(uri string, t *gnmipb.TypedValue, ctx context.Context) } /* Update request handling. */ -func TranslProcessUpdate(uri string, t *gnmipb.TypedValue, ctx context.Context) error { - /* Form the CURL request and send to client . */ - str := string(t.GetJsonIetfVal()) - str3 := strings.Replace(str, "\n", "", -1) - log.V(2).Info("Incoming JSON body is", str) +func TranslProcessUpdate(prefix *gnmipb.Path, entry *gnmipb.Update, ctx context.Context) error { + uri, err := ConvertToURI(prefix, entry.GetPath()) + if err != nil { + return err + } - payload := []byte(str3) + payload := entry.GetVal().GetJsonIetfVal() rc, _ := common_utils.GetContext(ctx) req := translib.SetRequest{Path:uri, Payload:payload, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}} if rc.BundleVersion != nil { @@ -266,12 +256,11 @@ func TranslProcessBulk(delete []*gnmipb.Path, replace []*gnmipb.Update, update [ } } for _,d := range delete { - ConvertToURI(prefix, d, &uri) - var str3 string - payload := []byte(str3) + if uri, err = ConvertToURI(prefix, d); err != nil { + return err + } req := translib.SetRequest{ Path: uri, - Payload: payload, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}, } if rc.BundleVersion != nil { @@ -284,11 +273,10 @@ func TranslProcessBulk(delete []*gnmipb.Path, replace []*gnmipb.Update, update [ deleteUri = append(deleteUri, uri) } for _,r := range replace { - ConvertToURI(prefix, r.GetPath(), &uri) - str := string(r.GetVal().GetJsonIetfVal()) - str3 := strings.Replace(str, "\n", "", -1) - log.V(2).Info("Incoming JSON body is", str) - payload := []byte(str3) + if uri, err = ConvertToURI(prefix, r.GetPath()); err != nil { + return err + } + payload := r.GetVal().GetJsonIetfVal() req := translib.SetRequest{ Path: uri, Payload: payload, @@ -304,11 +292,10 @@ func TranslProcessBulk(delete []*gnmipb.Path, replace []*gnmipb.Update, update [ replaceUri = append(replaceUri, uri) } for _,u := range update { - ConvertToURI(prefix, u.GetPath(), &uri) - str := string(u.GetVal().GetJsonIetfVal()) - str3 := strings.Replace(str, "\n", "", -1) - log.V(2).Info("Incoming JSON body is", str) - payload := []byte(str3) + if uri, err = ConvertToURI(prefix, u.GetPath()); err != nil { + return err + } + payload := u.GetVal().GetJsonIetfVal() req := translib.SetRequest{ Path: uri, Payload: payload,