Skip to content

Commit

Permalink
Remove Status on scandeps startup in favor of Capabilities call
Browse files Browse the repository at this point in the history
Currently we do both calls but this is redundant because we throw away the results of the status call anyways.

Also took this oppertunity to remove some testonly global variables so
that tests can run in parallel

Test: Presubmits pass
Bug: b/351056464
Change-Id: I45470d4a52410c53cfa57cf3aa284757681094da
GitOrigin-RevId: f13f1e762395eedc6cc389c9a036918c5419df1c
  • Loading branch information
bentekkie authored and copybara-github committed Jul 5, 2024
1 parent 2b819e0 commit b83c668
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 113 deletions.
9 changes: 5 additions & 4 deletions internal/pkg/cppdependencyscanner/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# gazelle:ignore
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
Expand All @@ -9,10 +8,12 @@ go_library(
deps = [
"//api/scandeps",
"//internal/pkg/cppdependencyscanner/depsscannerclient",
"//internal/pkg/logger",
"//internal/pkg/ipc",
"@com_github_bazelbuild_remote_apis_sdks//go/pkg/command",
"@com_github_bazelbuild_remote_apis_sdks//go/pkg/filemetadata",
"@com_github_bazelbuild_remote_apis_sdks//go/pkg/outerr",
"@com_github_golang_glog//:go_default_library",
"@com_github_bazelbuild_remote_apis_sdks//go/pkg/retry",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/emptypb",
],
)
42 changes: 41 additions & 1 deletion internal/pkg/cppdependencyscanner/cppdepscanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ package cppdependencyscanner
import (
"context"
"errors"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

pb "github.com/bazelbuild/reclient/api/scandeps"
"github.com/bazelbuild/reclient/internal/pkg/cppdependencyscanner/depsscannerclient"
"github.com/bazelbuild/reclient/internal/pkg/ipc"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/command"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/outerr"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/retry"

spb "github.com/bazelbuild/reclient/api/scandeps"
)
Expand All @@ -49,9 +57,41 @@ var (
// ErrDepsScanTimeout is the error returned by the input processor
// when it times out during the dependency scanning phase.
ErrDepsScanTimeout = errors.New("cpp dependency scanner timed out")
backoff = retry.ExponentialBackoff(1*time.Second, 10*time.Second, retry.Attempts(10))
shouldRetry = func(err error) bool {
if err == context.DeadlineExceeded {
return true
}
s, ok := status.FromError(err)
if !ok {
return false
}
switch s.Code() {
case codes.Canceled, codes.Unknown, codes.DeadlineExceeded, codes.Unavailable, codes.ResourceExhausted:
return true
default:
return false
}
}
)

// TODO (b/258275137): Move this to it's own package with a unit test
var connect = func(ctx context.Context, address string) (pb.CPPDepsScannerClient, *pb.CapabilitiesResponse, error) {
conn, err := ipc.DialContext(ctx, address)
if err != nil {
return nil, nil, err
}
client := pb.NewCPPDepsScannerClient(conn)
var capabilities *pb.CapabilitiesResponse
err = retry.WithPolicy(ctx, shouldRetry, backoff, func() error {
capabilities, err = client.Capabilities(ctx, &emptypb.Empty{})
return err
})
return client, capabilities, nil
}

// New creates new DepsScanner.
func New(ctx context.Context, executor executor, cacheDir, logDir string, cacheSizeMaxMb int, useDepsCache bool, depsScannerAddress, proxyServerAddress string) (DepsScanner, error) {
return depsscannerclient.New(ctx, executor, cacheDir, cacheSizeMaxMb, useDepsCache, logDir, depsScannerAddress, proxyServerAddress)
// TODO (b/258275137): make connTimeout configurable and move somewhere more appropriate when reconnect logic is implemented.
return depsscannerclient.New(ctx, executor, cacheDir, cacheSizeMaxMb, useDepsCache, logDir, depsScannerAddress, proxyServerAddress, 30*time.Second, connect)
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,10 @@ var (
}
)

var connect = func(ctx context.Context, address string) (pb.CPPDepsScannerClient, error) {
conn, err := ipc.DialContext(ctx, address)
if err != nil {
return nil, err
}
client := pb.NewCPPDepsScannerClient(conn)
err = retry.WithPolicy(ctx, shouldRetry, backoff, func() error {
_, err = client.Status(ctx, &emptypb.Empty{})
return err
})
return client, nil
}

// TODO (b/258275137): make this configurable and move somewhere more appropriate when reconnect logic is implemented.
var connTimeout = 30 * time.Second
type connectFn func(ctx context.Context, address string) (pb.CPPDepsScannerClient, *pb.CapabilitiesResponse, error)

// New creates new DepsScannerClient.
func New(ctx context.Context, executor executor, cacheDir string, cacheFileMaxMb int, useDepsCache bool, logDir string, depsScannerAddress, proxyServerAddress string) (*DepsScannerClient, error) {
func New(ctx context.Context, executor executor, cacheDir string, cacheFileMaxMb int, useDepsCache bool, logDir string, depsScannerAddress, proxyServerAddress string, connTimeout time.Duration, connect connectFn) (*DepsScannerClient, error) {
log.Infof("Connecting to remote dependency scanner: %v", depsScannerAddress)
client := &DepsScannerClient{
address: depsScannerAddress,
Expand All @@ -153,19 +139,21 @@ func New(ctx context.Context, executor executor, cacheDir string, cacheFileMaxMb
}

type connectResponse struct {
client pb.CPPDepsScannerClient
err error
client pb.CPPDepsScannerClient
capabilities *pb.CapabilitiesResponse
err error
}
connectCh := make(chan connectResponse)
ctx, cancel := context.WithTimeout(ctx, connTimeout)
defer cancel()
go func() {
defer close(connectCh)
client, err := connect(ctx, client.address)
client, capabilities, err := connect(ctx, client.address)
select {
case connectCh <- connectResponse{
client: client,
err: err,
client: client,
capabilities: capabilities,
err: err,
}:
case <-ctx.Done():
}
Expand All @@ -182,9 +170,7 @@ func New(ctx context.Context, executor executor, cacheDir string, cacheFileMaxMb
}
log.Infof("Connected to dependency scanner service on %v", client.address)
client.client = c.client
if err := client.updateCapabilities(ctx); err != nil {
return nil, fmt.Errorf("Failed to update capabilities: %w", err)
}
client.updateCapabilities(c.capabilities)
return client, nil
}
}
Expand Down Expand Up @@ -396,18 +382,10 @@ func (ds *DepsScannerClient) ProcessInputs(ctx context.Context, execID string, c
}
}

func (ds *DepsScannerClient) updateCapabilities(ctx context.Context) error {
if ds.client == nil {
return nil
}
capabilities, err := ds.client.Capabilities(ctx, &emptypb.Empty{})
if err != nil {
return err
}
func (ds *DepsScannerClient) updateCapabilities(capabilities *pb.CapabilitiesResponse) {
ds.capabilitiesMu.Lock()
defer ds.capabilitiesMu.Unlock()
ds.capabilities = capabilities
return nil
}

// Capabilities implements DepsScanner.Capabilities.
Expand All @@ -424,7 +402,7 @@ func (ds *DepsScannerClient) verifyService(ctx context.Context) error {
sctx, cancel := context.WithTimeout(ds.ctx, timeout)
defer cancel()

_, err := ds.client.Status(sctx, &emptypb.Empty{})
capabilities, err := ds.client.Capabilities(sctx, &emptypb.Empty{})

select {
case <-ctx.Done():
Expand All @@ -433,9 +411,7 @@ func (ds *DepsScannerClient) verifyService(ctx context.Context) error {
default:
// success?
if err == nil {
if err := ds.updateCapabilities(ctx); err != nil {
return err
}
ds.updateCapabilities(capabilities)
return nil
} // else
// Status call may return an error before the 10 seconds timeout expires if it isn't
Expand Down
Loading

0 comments on commit b83c668

Please sign in to comment.