Skip to content

Commit

Permalink
Project import generated by Copybara.
Browse files Browse the repository at this point in the history
FolderOrigin-RevId: /usr/local/google/home/gdennis/copybara/temp/folder-destination3951257471430535527/.
  • Loading branch information
GGN Engprod Team authored and greg-dennis committed Oct 28, 2022
1 parent 6e98c70 commit 3bb84a4
Show file tree
Hide file tree
Showing 12 changed files with 84,770 additions and 84,044 deletions.
1 change: 1 addition & 0 deletions generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ package ondatra
//go:generate ./proto/generate.sh
//go:generate ./proxy/proto/generate.sh
//go:generate ./internal/gnmigen/generate.sh
//go:generate ./internal/rawapis/testservice/generate.sh
84,440 changes: 42,219 additions & 42,221 deletions gnmi/oc/schema.go

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions internal/ate/ate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"sync"

"google.golang.org/grpc"
"github.com/openconfig/ondatra/binding"
"github.com/openconfig/ondatra/internal/rawapis"

Expand Down Expand Up @@ -164,12 +163,12 @@ func StopTraffic(ctx context.Context, ate binding.ATE) error {
}

// FetchGNMI returns the GNMI client for the Ixia.
func FetchGNMI(ctx context.Context, ate binding.ATE, opts ...grpc.DialOption) (gpb.GNMIClient, error) {
func FetchGNMI(ctx context.Context, ate binding.ATE) (gpb.GNMIClient, error) {
ix, err := ixiaForATE(ctx, ate)
if err != nil {
return nil, err
}
return ix.FetchGNMI(ctx, opts...)
return ix.FetchGNMI(ctx)
}

// SetPortState sets the state of a specified interface on the ATE.
Expand Down
6 changes: 3 additions & 3 deletions internal/ate/ixate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
"time"

log "github.com/golang/glog"
"google.golang.org/grpc"
"github.com/openconfig/ondatra/binding"
"github.com/openconfig/ondatra/binding/ixweb"
"github.com/openconfig/ondatra/internal/ixconfig"
"github.com/openconfig/ondatra/internal/ixgnmi"
"github.com/openconfig/ondatra/internal/rawapis"

gpb "github.com/openconfig/gnmi/proto/gnmi"
opb "github.com/openconfig/ondatra/proto"
Expand Down Expand Up @@ -1409,11 +1409,11 @@ func (ix *ixATE) StopAllTraffic(ctx context.Context) error {
}

// FetchGNMI returns the GNMI client for the Ixia.
func (ix *ixATE) FetchGNMI(ctx context.Context, opts ...grpc.DialOption) (gpb.GNMIClient, error) {
func (ix *ixATE) FetchGNMI(ctx context.Context) (gpb.GNMIClient, error) {
ix.mu.Lock()
defer ix.mu.Unlock()
if ix.gclient == nil {
gclient, err := ixgnmi.NewClient(ctx, ix.name, ix.readStats, unwrapClient(ix.c), opts...)
gclient, err := ixgnmi.NewClient(ctx, ix.name, ix.readStats, unwrapClient(ix.c), rawapis.CommonDialOpts...)
if err != nil {
return nil, err
}
Expand Down
80 changes: 80 additions & 0 deletions internal/rawapis/intercept.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rawapis

import (
"golang.org/x/net/context"
"fmt"
"sync/atomic"

"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)

// withUnaryAnnotateErrors annotates every gRPC error returned by an RPC with
// the RPC request message.
func withUnaryAnnotateErrors() grpc.DialOption {
return grpc.WithUnaryInterceptor(
func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
err := invoker(ctx, method, req, reply, cc, opts...)
return maybeAnnotateErr(err, req)
})
}

// withStreamAnnotateErrors annotates every gRPC error returned by an RPC with
// the preceding RPC request message.
func withStreamAnnotateErrors() grpc.DialOption {
return grpc.WithStreamInterceptor(
func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
client, err := streamer(ctx, desc, cc, method, opts...)
if client != nil {
client = &annotateErrClient{ClientStream: client}
}
return client, err
})
}

type annotateErrClient struct {
grpc.ClientStream
req atomic.Value
}

func (c *annotateErrClient) SendMsg(m any) error {
c.req.Store(m)
err := c.ClientStream.SendMsg(m)
return maybeAnnotateErr(err, m)
}

func (c *annotateErrClient) RecvMsg(m any) error {
err := c.ClientStream.RecvMsg(m)
return maybeAnnotateErr(err, c.req.Load())
}

func maybeAnnotateErr(err error, req any) error {
// Do not annotate the error if:
// 1. err is nil: there is no error to annotate
// 2. req is nil: there is no request to annotate with
// 3. err is not a grpc error: sentinel errors like io.EOF must be preserved
if err != nil && req != nil {
if st, ok := status.FromError(err); ok {
reqText := prototext.Format(req.(proto.Message))
msg := fmt.Sprintf("error on request {\n%s}: %s", reqText, st.Message())
return status.Error(st.Code(), msg)
}
}
return err
}
184 changes: 184 additions & 0 deletions internal/rawapis/intercept_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rawapis

import (
"golang.org/x/net/context"
"fmt"
"io"
"net"
"strings"
"testing"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/local"
"google.golang.org/grpc"
"google.golang.org/grpc/status"

tgrpcpb "github.com/openconfig/ondatra/internal/rawapis/testservice"
tpb "github.com/openconfig/ondatra/internal/rawapis/testservice"
)

func TestAnnotateErrors(t *testing.T) {
interceptSrv := new(interceptServer)
srv := grpc.NewServer()
defer srv.Stop()
tgrpcpb.RegisterTestServer(srv, interceptSrv)
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening on localhost: %v", err)
}
defer lis.Close()
go srv.Serve(lis)

ctx := context.Background()
conn, err := grpc.DialContext(ctx, lis.Addr().String(),
grpc.WithTransportCredentials(local.NewCredentials()),
withUnaryAnnotateErrors(),
withStreamAnnotateErrors(),
)
if err != nil {
t.Fatalf("error dialing server: %v", err)
}
interceptClient := tgrpcpb.NewTestClient(conn)

t.Run("unary success", func(t *testing.T) {
interceptSrv.unaryErr = nil
if _, gotErr := interceptClient.SendUnary(ctx, new(tpb.TestRequest)); gotErr != nil {
t.Errorf("unaryInterceptErr got error %v, want no error", gotErr)
}
})

t.Run("unary error", func(t *testing.T) {
const (
wantCode = codes.NotFound
wantErrMsg = "unary error"
wantReqMsg = "unary msg"
)
interceptSrv.unaryErr = status.Error(wantCode, wantErrMsg)
_, gotErr := interceptClient.SendUnary(ctx, &tpb.TestRequest{Message: wantReqMsg})
if gotErr == nil {
t.Fatalf("unaryInterceptErr got no error, want error, %v", gotErr)
}
if diff := grpcErrDiff(t, gotErr, wantCode, wantErrMsg, wantReqMsg); diff != "" {
t.Errorf("unaryInterceptErr got grpc error diff:\n%s", diff)
}
})

t.Run("unary unknown error", func(t *testing.T) {
const (
wantCode = codes.Unknown
wantErrMsg = "unknown error"
wantReqMsg = "unknown msg"
)
interceptSrv.unaryErr = fmt.Errorf(wantErrMsg)
_, gotErr := interceptClient.SendUnary(ctx, &tpb.TestRequest{Message: wantReqMsg})
if gotErr == nil {
t.Fatalf("unaryInterceptErr got no error, want error, %v", gotErr)
}
if diff := grpcErrDiff(t, gotErr, wantCode, wantErrMsg, wantReqMsg); diff != "" {
t.Errorf("unaryInterceptErr got grpc error diff:\n%s", diff)
}
})

t.Run("stream recv eof", func(t *testing.T) {
interceptSrv.streamErr = nil
streamClient := mustStartStream(t, interceptClient)
mustSendMessage(t, streamClient, "msg")
if _, gotErr := streamClient.Recv(); gotErr != io.EOF {
t.Errorf("streamInterceptErr got error %v, want %v", gotErr, io.EOF)
}
})

t.Run("stream recv no send", func(t *testing.T) {
const (
wantCode = codes.PermissionDenied
wantErrMsg = "stream error"
)
interceptSrv.streamErr = status.Error(wantCode, wantErrMsg)
streamClient := mustStartStream(t, interceptClient)
_, gotErr := streamClient.Recv()
if gotErr == nil {
t.Fatalf("streamInterceptErr got no error, want error, %v", gotErr)
}
if diff := grpcErrDiff(t, gotErr, wantCode, wantErrMsg); diff != "" {
t.Errorf("streamInterceptErr got grpc error diff:\n%s", diff)
}
})

t.Run("stream recv after send", func(t *testing.T) {
const (
wantCode = codes.PermissionDenied
wantErrMsg = "stream error"
wantReqMsg = "stream msg"
)
interceptSrv.streamErr = status.Error(wantCode, wantErrMsg)
streamClient := mustStartStream(t, interceptClient)
mustSendMessage(t, streamClient, wantReqMsg)
_, gotErr := streamClient.Recv()
if gotErr == nil {
t.Fatalf("streamInterceptErr got no error, want error, %v", gotErr)
}
if diff := grpcErrDiff(t, gotErr, wantCode, wantErrMsg, wantReqMsg); diff != "" {
t.Errorf("streamInterceptErr got grpc error diff:\n%s", diff)
}
})
}

type interceptServer struct {
tgrpcpb.UnimplementedTestServer
unaryErr error
streamErr error
}

func (s *interceptServer) SendUnary(context.Context, *tpb.TestRequest) (*tpb.TestResponse, error) {
return new(tpb.TestResponse), s.unaryErr
}

func (s *interceptServer) SendStream(srv tgrpcpb.Test_SendStreamServer) error {
return s.streamErr
}

func mustStartStream(t *testing.T, client tgrpcpb.TestClient) tgrpcpb.Test_SendStreamClient {
streamClient, err := client.SendStream(context.Background())
if err != nil {
t.Fatalf("SendStream failed with error: %v", err)
}
return streamClient
}

func mustSendMessage(t *testing.T, streamClient tgrpcpb.Test_SendStreamClient, msg string) {
if err := streamClient.Send(&tpb.TestRequest{Message: msg}); err != nil {
t.Fatalf("Send failed with error: %v", err)
}
}

func grpcErrDiff(t *testing.T, err error, wantCode codes.Code, wantSubstrs ...string) string {
st, ok := status.FromError(err)
if !ok {
t.Fatalf("status.FromError failed to convert error: %v", err)
}
gotCode, gotMsg := st.Code(), st.Message()
var lines []string
if gotCode != wantCode {
lines = append(lines, fmt.Sprintf("want code %v, got %v", wantCode, gotCode))
}
for _, wantSub := range wantSubstrs {
if !strings.Contains(gotMsg, wantSub) {
lines = append(lines, fmt.Sprintf("want message containing %q, got %q", wantSub, gotMsg))
}
}
return strings.Join(lines, "\n")
}
21 changes: 15 additions & 6 deletions internal/rawapis/rawapis.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ package rawapis
import (
"golang.org/x/net/context"
"fmt"
"math"
"sync"

"google.golang.org/grpc"
Expand All @@ -37,6 +36,14 @@ import (
p4pb "github.com/p4lang/p4runtime/go/p4/v1"
)

// CommonDialOpts to include in all gRPC dial calls.
// TODO: Unexport once IxNetwork is removed.
var CommonDialOpts = []grpc.DialOption{
grpc.WithBlock(),
withUnaryAnnotateErrors(),
withStreamAnnotateErrors(),
}

// NewCLI creates a CLI client for the specified DUT.
func NewCLI(ctx context.Context, dut binding.DUT) (binding.StreamClient, error) {
return dut.DialCLI(ctx)
Expand All @@ -54,7 +61,7 @@ var (

// NewGNMI creates a new gNMI client for the specified DUT.
func NewGNMI(ctx context.Context, dut binding.DUT) (gpb.GNMIClient, error) {
return dut.DialGNMI(ctx, grpc.WithBlock(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)))
return dut.DialGNMI(ctx, CommonDialOpts...)
}

// FetchGNMI fetches the cached gNMI client for the specified DUT.
Expand All @@ -80,7 +87,7 @@ var (

// NewGNOI creates a gNOI client for the specified DUT.
func NewGNOI(ctx context.Context, dut binding.DUT) (binding.GNOIClients, error) {
return dut.DialGNOI(ctx, grpc.WithBlock())
return dut.DialGNOI(ctx, CommonDialOpts...)
}

// FetchGNOI fetches the cached gNOI client for the specified DUT.
Expand All @@ -106,7 +113,7 @@ var (

// NewGRIBI creates a new gRIBI client for the specified DUT.
func NewGRIBI(ctx context.Context, dut binding.DUT) (grpb.GRIBIClient, error) {
return dut.DialGRIBI(ctx, grpc.WithBlock())
return dut.DialGRIBI(ctx, CommonDialOpts...)
}

// FetchGRIBI fetches the cached gRIBI client for the specified DUT.
Expand All @@ -132,7 +139,7 @@ var (

// NewP4RT creates a new P4RT client for the specified DUT.
func NewP4RT(ctx context.Context, dut binding.DUT) (p4pb.P4RuntimeClient, error) {
return dut.DialP4RT(ctx, grpc.WithBlock())
return dut.DialP4RT(ctx, CommonDialOpts...)
}

// FetchP4RT fetches the cached P4RT client for the specified DUT.
Expand Down Expand Up @@ -184,6 +191,8 @@ func FetchOTG(ctx context.Context, ate binding.ATE) (gosnappi.GosnappiApi, error
c, ok := otgs[ate]
if !ok {
var err error
// TODO: Add common dial options here when/if supported:
// https://github.com/open-traffic-generator/snappi/issues/192
c, err = ate.DialOTG(ctx)
if err != nil {
return nil, fmt.Errorf("error dialing OTG: %w", err)
Expand All @@ -205,7 +214,7 @@ func FetchOTGGNMI(ctx context.Context, ate binding.ATE) (gpb.GNMIClient, error)
c, ok := otgGNMIs[ate]
if !ok {
var err error
c, err = ate.DialGNMI(ctx)
c, err = ate.DialGNMI(ctx, CommonDialOpts...)
if err != nil {
return nil, fmt.Errorf("error dialing OTG GNMI: %w", err)
}
Expand Down
Loading

0 comments on commit 3bb84a4

Please sign in to comment.