Skip to content

Commit

Permalink
Universal Profiling: Versioned RPC protocol (#10573)
Browse files Browse the repository at this point in the history
Added checks for RPC protocol version mismatches.
  • Loading branch information
christos68k authored Apr 3, 2023
1 parent 01f482c commit d4b7b71
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 21 deletions.
4 changes: 3 additions & 1 deletion systemtest/profiling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func TestProfiling(t *testing.T) {
ctx := metadata.AppendToOutgoingContext(context.Background(),
"secretToken", secretToken,
"projectID", "123",
"hostID", "abc123")
"hostID", "abc123",
"rpcVersion", "1",
)

// We always insert 2 elements in KV indices for each test RPC below.
// All RPCs use a columnar format, where arrays of fields are bundled
Expand Down
74 changes: 61 additions & 13 deletions x-pack/apm-server/profiling/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,26 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/headers"
)

var (
// This is the version of the gRPC protocol specified in collection_agent.proto.
// It's retrieved and cached in NewCollector and used to check for protocol mismatches
// against the protocol version that the client sends.
rpcProtocolVersion uint32
)

// GetRPCVersion returns the version of the RPC protocol
func GetRPCVersion() uint32 {
// Retrieve protocol version defined in the .proto file
options := File_collection_agent_proto.Options()
return proto.GetExtension(options, E_Version).(uint32)
}

// AuthenticateUnaryCall implements the interceptors.UnaryAuthenticator
// interface, extracting the secret token supplied by the Host Agent,
// which we will treat the same as an APM secret token.
Expand All @@ -27,29 +42,62 @@ func (e *ElasticCollector) AuthenticateUnaryCall(
) (auth.AuthenticationDetails, auth.Authorizer, error) {
md, _ := metadata.FromIncomingContext(ctx)
secretToken := GetFirstOrEmpty(md, MetadataKeySecretToken)
projectID := GetFirstOrEmpty(md, MetadataKeyProjectID)
hostID := GetFirstOrEmpty(md, MetadataKeyHostID)
projectIDStr := GetFirstOrEmpty(md, MetadataKeyProjectID)
hostIDStr := GetFirstOrEmpty(md, MetadataKeyHostID)
rpcVersionStr := GetFirstOrEmpty(md, MetadataKeyRPCVersion)

if secretToken == "" {
return auth.AuthenticationDetails{}, nil, status.Errorf(codes.Unauthenticated, "secret token is missing")
return auth.AuthenticationDetails{}, nil,
status.Errorf(codes.FailedPrecondition, "secret token is missing")
}
if projectIDStr == "" {
return auth.AuthenticationDetails{}, nil,
status.Errorf(codes.FailedPrecondition, "project ID is missing")
}
if projectID == "" {
return auth.AuthenticationDetails{}, nil, status.Errorf(codes.Unauthenticated, "project ID is missing")
if hostIDStr == "" {
return auth.AuthenticationDetails{}, nil,
status.Errorf(codes.FailedPrecondition, "host ID is missing")
}
if hostID == "" {
return auth.AuthenticationDetails{}, nil, status.Errorf(codes.Unauthenticated, "host ID is missing")
if rpcVersionStr == "" {
return auth.AuthenticationDetails{}, nil,
status.Errorf(codes.FailedPrecondition, "RPC version is missing")
}

if _, err := strconv.Atoi(projectID); err != nil {
if _, err := strconv.ParseUint(projectIDStr, 10, 32); err != nil {
e.logger.Errorf("possible malicious client request, "+
"converting project ID from string (%s) to uint failed: %v", projectID, err)
return auth.AuthenticationDetails{}, nil, auth.ErrAuthFailed
"converting project ID from string (%s) to uint failed: %v", projectIDStr, err)
return auth.AuthenticationDetails{}, nil,
status.Errorf(codes.FailedPrecondition, "invalid project ID")
}

if _, err := strconv.ParseUint(hostID, 16, 64); err != nil {
if _, err := strconv.ParseUint(hostIDStr, 16, 64); err != nil {
e.logger.Errorf("possible malicious client request, "+
"converting host ID from string (%s) to uint failed: %v", hostID, err)
return auth.AuthenticationDetails{}, nil, auth.ErrAuthFailed
"converting host ID from string (%s) to uint failed: %v", hostIDStr, err)
return auth.AuthenticationDetails{}, nil,
status.Errorf(codes.FailedPrecondition, "invalid host ID")
}

rpcVersion64, err := strconv.ParseUint(rpcVersionStr, 10, 32)
if err != nil {
e.logger.Errorf("converting RPC version from string (%s) to uint failed: %v",
rpcVersionStr, err)
return auth.AuthenticationDetails{}, nil,
status.Errorf(codes.FailedPrecondition, "invalid RPC version")
}

rpcVersion := uint32(rpcVersion64)
if rpcVersion != rpcProtocolVersion {
e.logger.Errorf("incompatible RPC version: %d => %d", rpcVersion, rpcProtocolVersion)

if rpcVersion < rpcProtocolVersion {
return auth.AuthenticationDetails{}, nil,
status.Errorf(codes.FailedPrecondition,
"HostAgent version is unsupported, please upgrade to the latest version")
}

return auth.AuthenticationDetails{}, nil,
status.Errorf(codes.FailedPrecondition,
"Backend is incompatible with HostAgent, please check your configuration")
}

return authenticator.Authenticate(ctx, headers.Bearer, secretToken)
Expand Down
9 changes: 6 additions & 3 deletions x-pack/apm-server/profiling/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ type ElasticCollector struct {
clusterID string
}

// NewCollector returns a new ElasticCollector uses indexer for storing stack trace data in
// Elasticsearch, and metricsIndexer for storing host agent metrics. Separate indexers are
// used to allow for host agent metrics to be sent to a separate monitoring cluster.
// NewCollector returns a new ElasticCollector which uses indexer for storing stack trace
// data in Elasticsearch, and metricsIndexer for storing host agent metrics. Separate
// indexers are used to allow for host agent metrics to be sent to a separate monitoring
// cluster.
func NewCollector(
indexer esutil.BulkIndexer,
metricsIndexer esutil.BulkIndexer,
Expand All @@ -104,6 +105,8 @@ func NewCollector(
c.indexes[i] = fmt.Sprintf("%s-%dpow%02d", common.EventsIndexPrefix,
common.SamplingFactor, i+1)
}

rpcProtocolVersion = GetRPCVersion()
return c
}

Expand Down
9 changes: 5 additions & 4 deletions x-pack/apm-server/profiling/grpcext.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
MetadataKeyHostname = "hostname"
MetadataKeyKernelVersion = "kernelVersion"
MetadataKeyHostID = "hostID"
MetadataKeyRPCVersion = "rpcVersion"
// Tags will be auto base64 encoded/decoded
MetadataKeyTags = "tags-bin"
)
Expand All @@ -36,17 +37,17 @@ func GetProjectID(ctx context.Context) uint32 {
// Metadata and host ID have been validated in auth interceptor,
// no need to error check here.
md, _ := metadata.FromIncomingContext(ctx)
projectIDs := GetFirstOrEmpty(md, MetadataKeyProjectID)
projectID64, _ := strconv.Atoi(projectIDs)
projectIDStr := GetFirstOrEmpty(md, MetadataKeyProjectID)
projectID64, _ := strconv.ParseUint(projectIDStr, 10, 32)
return uint32(projectID64)
}

func GetHostID(ctx context.Context) uint64 {
// Metadata and host ID have been validated in auth interceptor,
// no need to error check here.
md, _ := metadata.FromIncomingContext(ctx)
hostIDs := GetFirstOrEmpty(md, MetadataKeyHostID)
hostID, _ := strconv.ParseUint(hostIDs, 16, 64)
hostIDStr := GetFirstOrEmpty(md, MetadataKeyHostID)
hostID, _ := strconv.ParseUint(hostIDStr, 16, 64)
return hostID
}

Expand Down

0 comments on commit d4b7b71

Please sign in to comment.