From b83c668bdfd82fec96403e437bb6c343f053f994 Mon Sep 17 00:00:00 2001 From: Ben Segall Date: Thu, 4 Jul 2024 15:47:08 +0000 Subject: [PATCH] Remove Status on scandeps startup in favor of Capabilities call Currently we do both calls but this is redundant because we throw away the results of the status call anyways. Also took this oppertunity to remove some testonly global variables so that tests can run in parallel Test: Presubmits pass Bug: b/351056464 Change-Id: I45470d4a52410c53cfa57cf3aa284757681094da GitOrigin-RevId: f13f1e762395eedc6cc389c9a036918c5419df1c --- internal/pkg/cppdependencyscanner/BUILD.bazel | 9 +- .../pkg/cppdependencyscanner/cppdepscanner.go | 42 ++++++- .../depsscannerclient/depsscannerclient.go | 50 +++------ .../depsscannerclient_test.go | 105 ++++++------------ 4 files changed, 93 insertions(+), 113 deletions(-) diff --git a/internal/pkg/cppdependencyscanner/BUILD.bazel b/internal/pkg/cppdependencyscanner/BUILD.bazel index bf823a49..964dcf2b 100644 --- a/internal/pkg/cppdependencyscanner/BUILD.bazel +++ b/internal/pkg/cppdependencyscanner/BUILD.bazel @@ -1,4 +1,3 @@ -# gazelle:ignore load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( @@ -9,10 +8,12 @@ go_library( deps = [ "//api/scandeps", "//internal/pkg/cppdependencyscanner/depsscannerclient", - "//internal/pkg/logger", + "//internal/pkg/ipc", "@com_github_bazelbuild_remote_apis_sdks//go/pkg/command", - "@com_github_bazelbuild_remote_apis_sdks//go/pkg/filemetadata", "@com_github_bazelbuild_remote_apis_sdks//go/pkg/outerr", - "@com_github_golang_glog//:go_default_library", + "@com_github_bazelbuild_remote_apis_sdks//go/pkg/retry", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_golang_google_protobuf//types/known/emptypb", ], ) diff --git a/internal/pkg/cppdependencyscanner/cppdepscanner.go b/internal/pkg/cppdependencyscanner/cppdepscanner.go index f31bd5f5..60932b34 100644 --- a/internal/pkg/cppdependencyscanner/cppdepscanner.go +++ b/internal/pkg/cppdependencyscanner/cppdepscanner.go @@ -21,11 +21,19 @@ package cppdependencyscanner import ( "context" "errors" + "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + + pb "github.com/bazelbuild/reclient/api/scandeps" "github.com/bazelbuild/reclient/internal/pkg/cppdependencyscanner/depsscannerclient" + "github.com/bazelbuild/reclient/internal/pkg/ipc" "github.com/bazelbuild/remote-apis-sdks/go/pkg/command" "github.com/bazelbuild/remote-apis-sdks/go/pkg/outerr" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/retry" spb "github.com/bazelbuild/reclient/api/scandeps" ) @@ -49,9 +57,41 @@ var ( // ErrDepsScanTimeout is the error returned by the input processor // when it times out during the dependency scanning phase. ErrDepsScanTimeout = errors.New("cpp dependency scanner timed out") + backoff = retry.ExponentialBackoff(1*time.Second, 10*time.Second, retry.Attempts(10)) + shouldRetry = func(err error) bool { + if err == context.DeadlineExceeded { + return true + } + s, ok := status.FromError(err) + if !ok { + return false + } + switch s.Code() { + case codes.Canceled, codes.Unknown, codes.DeadlineExceeded, codes.Unavailable, codes.ResourceExhausted: + return true + default: + return false + } + } ) +// TODO (b/258275137): Move this to it's own package with a unit test +var connect = func(ctx context.Context, address string) (pb.CPPDepsScannerClient, *pb.CapabilitiesResponse, error) { + conn, err := ipc.DialContext(ctx, address) + if err != nil { + return nil, nil, err + } + client := pb.NewCPPDepsScannerClient(conn) + var capabilities *pb.CapabilitiesResponse + err = retry.WithPolicy(ctx, shouldRetry, backoff, func() error { + capabilities, err = client.Capabilities(ctx, &emptypb.Empty{}) + return err + }) + return client, capabilities, nil +} + // New creates new DepsScanner. func New(ctx context.Context, executor executor, cacheDir, logDir string, cacheSizeMaxMb int, useDepsCache bool, depsScannerAddress, proxyServerAddress string) (DepsScanner, error) { - return depsscannerclient.New(ctx, executor, cacheDir, cacheSizeMaxMb, useDepsCache, logDir, depsScannerAddress, proxyServerAddress) + // TODO (b/258275137): make connTimeout configurable and move somewhere more appropriate when reconnect logic is implemented. + return depsscannerclient.New(ctx, executor, cacheDir, cacheSizeMaxMb, useDepsCache, logDir, depsScannerAddress, proxyServerAddress, 30*time.Second, connect) } diff --git a/internal/pkg/cppdependencyscanner/depsscannerclient/depsscannerclient.go b/internal/pkg/cppdependencyscanner/depsscannerclient/depsscannerclient.go index 7236bcb4..a8f93d98 100644 --- a/internal/pkg/cppdependencyscanner/depsscannerclient/depsscannerclient.go +++ b/internal/pkg/cppdependencyscanner/depsscannerclient/depsscannerclient.go @@ -110,24 +110,10 @@ var ( } ) -var connect = func(ctx context.Context, address string) (pb.CPPDepsScannerClient, error) { - conn, err := ipc.DialContext(ctx, address) - if err != nil { - return nil, err - } - client := pb.NewCPPDepsScannerClient(conn) - err = retry.WithPolicy(ctx, shouldRetry, backoff, func() error { - _, err = client.Status(ctx, &emptypb.Empty{}) - return err - }) - return client, nil -} - -// TODO (b/258275137): make this configurable and move somewhere more appropriate when reconnect logic is implemented. -var connTimeout = 30 * time.Second +type connectFn func(ctx context.Context, address string) (pb.CPPDepsScannerClient, *pb.CapabilitiesResponse, error) // New creates new DepsScannerClient. -func New(ctx context.Context, executor executor, cacheDir string, cacheFileMaxMb int, useDepsCache bool, logDir string, depsScannerAddress, proxyServerAddress string) (*DepsScannerClient, error) { +func New(ctx context.Context, executor executor, cacheDir string, cacheFileMaxMb int, useDepsCache bool, logDir string, depsScannerAddress, proxyServerAddress string, connTimeout time.Duration, connect connectFn) (*DepsScannerClient, error) { log.Infof("Connecting to remote dependency scanner: %v", depsScannerAddress) client := &DepsScannerClient{ address: depsScannerAddress, @@ -153,19 +139,21 @@ func New(ctx context.Context, executor executor, cacheDir string, cacheFileMaxMb } type connectResponse struct { - client pb.CPPDepsScannerClient - err error + client pb.CPPDepsScannerClient + capabilities *pb.CapabilitiesResponse + err error } connectCh := make(chan connectResponse) ctx, cancel := context.WithTimeout(ctx, connTimeout) defer cancel() go func() { defer close(connectCh) - client, err := connect(ctx, client.address) + client, capabilities, err := connect(ctx, client.address) select { case connectCh <- connectResponse{ - client: client, - err: err, + client: client, + capabilities: capabilities, + err: err, }: case <-ctx.Done(): } @@ -182,9 +170,7 @@ func New(ctx context.Context, executor executor, cacheDir string, cacheFileMaxMb } log.Infof("Connected to dependency scanner service on %v", client.address) client.client = c.client - if err := client.updateCapabilities(ctx); err != nil { - return nil, fmt.Errorf("Failed to update capabilities: %w", err) - } + client.updateCapabilities(c.capabilities) return client, nil } } @@ -396,18 +382,10 @@ func (ds *DepsScannerClient) ProcessInputs(ctx context.Context, execID string, c } } -func (ds *DepsScannerClient) updateCapabilities(ctx context.Context) error { - if ds.client == nil { - return nil - } - capabilities, err := ds.client.Capabilities(ctx, &emptypb.Empty{}) - if err != nil { - return err - } +func (ds *DepsScannerClient) updateCapabilities(capabilities *pb.CapabilitiesResponse) { ds.capabilitiesMu.Lock() defer ds.capabilitiesMu.Unlock() ds.capabilities = capabilities - return nil } // Capabilities implements DepsScanner.Capabilities. @@ -424,7 +402,7 @@ func (ds *DepsScannerClient) verifyService(ctx context.Context) error { sctx, cancel := context.WithTimeout(ds.ctx, timeout) defer cancel() - _, err := ds.client.Status(sctx, &emptypb.Empty{}) + capabilities, err := ds.client.Capabilities(sctx, &emptypb.Empty{}) select { case <-ctx.Done(): @@ -433,9 +411,7 @@ func (ds *DepsScannerClient) verifyService(ctx context.Context) error { default: // success? if err == nil { - if err := ds.updateCapabilities(ctx); err != nil { - return err - } + ds.updateCapabilities(capabilities) return nil } // else // Status call may return an error before the 10 seconds timeout expires if it isn't diff --git a/internal/pkg/cppdependencyscanner/depsscannerclient/depsscannerclient_test.go b/internal/pkg/cppdependencyscanner/depsscannerclient/depsscannerclient_test.go index 2b76671b..9f9274e2 100644 --- a/internal/pkg/cppdependencyscanner/depsscannerclient/depsscannerclient_test.go +++ b/internal/pkg/cppdependencyscanner/depsscannerclient/depsscannerclient_test.go @@ -55,6 +55,8 @@ type stubExecutor struct { type testService struct { // A fake gRPC client to return from connect. stubClient *stubClient + // A fake capabilities response + capabilities *pb.CapabilitiesResponse // The number of times connect should fail before returning stubClient. connectDelay time.Time // Output: the number of times connect() was called. @@ -62,18 +64,18 @@ type testService struct { } // connect returns a preset stubClient. -func (s *testService) connect(ctx context.Context, address string) (pb.CPPDepsScannerClient, error) { +func (s *testService) connect(ctx context.Context, address string) (pb.CPPDepsScannerClient, *pb.CapabilitiesResponse, error) { s.connectCount.Add(1) select { case <-time.After(time.Until(s.connectDelay)): // Sleep, simulate a slow connection that may or may not timeout case <-ctx.Done(): - return nil, errors.New("Connection timed out") + return nil, nil, errors.New("Connection timed out") } if s.stubClient != nil { - return s.stubClient, nil + return s.stubClient, s.capabilities, nil } - return nil, errors.New("Connection not ready yet") + return nil, nil, errors.New("Connection not ready yet") } // A stub CPPDepsScannerClient. @@ -149,15 +151,11 @@ var ( // TestNew_ConnectSuccess tests that a call to New() can connect to an already running dependency // scanner service. func TestNew_ConnectSuccess(t *testing.T) { + t.Parallel() testService := &testService{ stubClient: &stubClient{}, } - oldConnect := connect - connect = testService.connect - t.Cleanup(func() { - connect = oldConnect - }) - depsScannerClient, err := New(context.Background(), nil, "", 0, false, "", "127.0.0.1:8001", "127.0.0.1:1000") + depsScannerClient, err := New(context.Background(), nil, "", 0, false, "", "127.0.0.1:8001", "127.0.0.1:1000", 30*time.Second, testService.connect) if err != nil { t.Errorf("New() retured unexpected error: %v", err) } @@ -172,17 +170,12 @@ func TestNew_ConnectSuccess(t *testing.T) { // TestNew_ConnectFailure tests that a call to New() will fail if no dependency scanner service is // running when expected. func TestNew_ConnectFailure(t *testing.T) { - setConnTimeout(t, 500*time.Millisecond) + t.Parallel() testService := &testService{ stubClient: nil, connectDelay: time.Now().Add(5 * time.Second), // Wait for 5 seconds before "accepting" the connection } - oldConnect := connect - connect = testService.connect - t.Cleanup(func() { - connect = oldConnect - }) - depsScannerClient, err := New(context.Background(), nil, "", 0, false, "", "127.0.0.1:8001", "127.0.0.1:1000") + depsScannerClient, err := New(context.Background(), nil, "", 0, false, "", "127.0.0.1:8001", "127.0.0.1:1000", 500*time.Millisecond, testService.connect) if err == nil { t.Errorf("New() did not return expected error") } @@ -198,16 +191,17 @@ func TestNew_ConnectFailure(t *testing.T) { // TestNew_StartSuccess tests that a call to New() will start and connect to a dependency scanner // service executable. func TestNew_StartSuccess(t *testing.T) { + t.Parallel() + fakeCapabilities := &pb.CapabilitiesResponse{ + Caching: false, + ExpectsResourceDir: true, + } testService := &testService{ - stubClient: &stubClient{}, + stubClient: &stubClient{}, + capabilities: fakeCapabilities, } - oldConnect := connect - connect = testService.connect - t.Cleanup(func() { - connect = oldConnect - }) stubExecutor := &stubExecutor{} - depsScannerClient, err := New(context.Background(), stubExecutor, "", 0, false, "", "exec://test_exec", "127.0.0.1:1000") + depsScannerClient, err := New(context.Background(), stubExecutor, "", 0, false, "", "exec://test_exec", "127.0.0.1:1000", 30*time.Second, testService.connect) if err != nil { t.Errorf("New() retured unexpected error: %v", err) } @@ -238,23 +232,22 @@ func TestNew_StartSuccess(t *testing.T) { default: // No Cancel() call. Expected. } + if got := depsScannerClient.Capabilities(); got != fakeCapabilities { + t.Errorf("Capabilities() returned unexpected value, wanted %v, got %v", fakeCapabilities, got) + } } // TestNew_StartFailure tests that a call to New() will fail if an executable is provided but cannot // be started. func TestNew_StartFailure(t *testing.T) { + t.Parallel() testService := &testService{ stubClient: &stubClient{}, } - oldConnect := connect - connect = testService.connect - t.Cleanup(func() { - connect = oldConnect - }) stubExecutor := &stubExecutor{ err: errors.New("File not found"), } - depsScannerClient, err := New(context.Background(), stubExecutor, "", 0, false, "", "exec://test_exec", "127.0.0.1:1000") + depsScannerClient, err := New(context.Background(), stubExecutor, "", 0, false, "", "exec://test_exec", "127.0.0.1:1000", 30*time.Second, testService.connect) if err == nil { t.Errorf("New() did not return expected error") } @@ -284,17 +277,12 @@ func TestNew_StartFailure(t *testing.T) { // TestNew_StartNoConnect tests that New() will error if it is able to successfully start a // dependency scanner service, but is unable to connect to it for any reason. func TestNew_StartNoConnect(t *testing.T) { - setConnTimeout(t, 500*time.Millisecond) + t.Parallel() testService := &testService{ connectDelay: time.Now().Add(5 * time.Second), // Wait for 5 seconds before "accepting" the connection } - oldConnect := connect - connect = testService.connect - t.Cleanup(func() { - connect = oldConnect - }) stubExecutor := &stubExecutor{} - depsScannerClient, err := New(context.Background(), stubExecutor, "", 0, false, "", "exec://test_exec", "127.0.0.1:1000") + depsScannerClient, err := New(context.Background(), stubExecutor, "", 0, false, "", "exec://test_exec", "127.0.0.1:1000", 500*time.Millisecond, testService.connect) if err == nil { t.Errorf("New() did not return expected error") } @@ -320,17 +308,13 @@ func TestNew_StartNoConnect(t *testing.T) { // dependency scanner service takes a little while (more than 5 seconds, less than 15) to become // available. func TestNew_StartDelayedConnect(t *testing.T) { + t.Parallel() testService := &testService{ stubClient: &stubClient{}, connectDelay: time.Now().Add(5 * time.Second), // Wait for 5 seconds before "accepting" the connection } - oldConnect := connect - connect = testService.connect - t.Cleanup(func() { - connect = oldConnect - }) stubExecutor := &stubExecutor{} - depsScannerClient, err := New(context.Background(), stubExecutor, "", 0, false, "", "exec://test_exec", "127.0.0.1:1000") + depsScannerClient, err := New(context.Background(), stubExecutor, "", 0, false, "", "exec://test_exec", "127.0.0.1:1000", 30*time.Second, testService.connect) if err != nil { t.Errorf("New() retured unexpected error: %v", err) } @@ -360,6 +344,7 @@ func TestNew_StartDelayedConnect(t *testing.T) { // TestStopService_Success tests that a call to stopService (trivially called by the exported // Close() function) will properly stop the service. func TestStopService_Success(t *testing.T) { + t.Parallel() terminateCalled := 0 stubClient := &stubClient{} depsScannerClient := &DepsScannerClient{ @@ -404,6 +389,7 @@ func TestStopService_Success(t *testing.T) { // function) will error if it was unable to verify the service had been shutdown after a fixed // timeout. func TestStopService_Failure(t *testing.T) { + t.Parallel() terminateCalled := 0 stubClient := &stubClient{} depsScannerClient := &DepsScannerClient{ @@ -438,6 +424,7 @@ func TestStopService_Failure(t *testing.T) { } func TestProcessInputs_nocache(t *testing.T) { + t.Parallel() execID := uuid.New().String() fileList := []string{ filename, @@ -473,6 +460,7 @@ func TestProcessInputs_nocache(t *testing.T) { } func TestProcessInputs_abspath(t *testing.T) { + t.Parallel() execID := uuid.New().String() fileList := []string{ filename, @@ -512,6 +500,7 @@ func TestProcessInputs_abspath(t *testing.T) { } func TestProcessInputs_cache(t *testing.T) { + t.Parallel() execID := uuid.New().String() fileList := []string{ filename, @@ -548,6 +537,7 @@ func TestProcessInputs_cache(t *testing.T) { } func TestProcessInputs_remoteError(t *testing.T) { + t.Parallel() execID := uuid.New().String() stubClient := &stubClient{ processInputsResponse: &pb.CPPProcessInputsResponse{ @@ -576,6 +566,7 @@ func TestProcessInputs_remoteError(t *testing.T) { } func TestProcessInputs_timeoutError(t *testing.T) { + t.Parallel() execID := uuid.New().String() stubClient := &stubClient{ processInputsResponse: &pb.CPPProcessInputsResponse{ @@ -787,25 +778,6 @@ func TestBuildAddress(t *testing.T) { } } -func TestCapabilities(t *testing.T) { - want := &pb.CapabilitiesResponse{ - Caching: false, - ExpectsResourceDir: true, - } - stubClient := &stubClient{ - capabilitiesResponse: want, - } - client := &DepsScannerClient{ - ctx: context.Background(), - client: stubClient, - } - client.updateCapabilities(context.Background()) - - if got := client.Capabilities(); got != want { - t.Errorf("Capabilities() returned unexpected value, wanted %v, got %v", want, got) - } -} - func runForPlatforms(t *testing.T, name string, platforms []string, test func(t *testing.T)) { t.Helper() for _, platform := range platforms { @@ -815,12 +787,3 @@ func runForPlatforms(t *testing.T, name string, platforms []string, test func(t } } } - -func setConnTimeout(t *testing.T, newTimeout time.Duration) { - t.Helper() - oldConnTimeout := connTimeout - connTimeout = newTimeout - t.Cleanup(func() { - connTimeout = oldConnTimeout - }) -}