Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#2529 from justinsb/pubsub_fide…
Browse files Browse the repository at this point in the history
…lity

mockgcp: support for PubSubSubscription
  • Loading branch information
google-oss-prow[bot] authored Aug 27, 2024
2 parents ec9aceb + 6679743 commit 396428b
Show file tree
Hide file tree
Showing 23 changed files with 4,304 additions and 572 deletions.
1 change: 1 addition & 0 deletions config/tests/samples/create/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured
case schema.GroupKind{Group: "privilegedaccessmanager.cnrm.google.com", Kind: "PrivilegedAccessManagerEntitlement"}:

case schema.GroupKind{Group: "pubsub.cnrm.cloud.google.com", Kind: "PubSubSchema"}:
case schema.GroupKind{Group: "pubsub.cnrm.cloud.google.com", Kind: "PubSubSubscription"}:
case schema.GroupKind{Group: "pubsub.cnrm.cloud.google.com", Kind: "PubSubTopic"}:

case schema.GroupKind{Group: "redis.cnrm.cloud.google.com", Kind: "RedisInstance"}:
Expand Down
6 changes: 5 additions & 1 deletion mockgcp/mockdataflow/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,12 @@ func (r *jobsServer) UpdateJob(ctx context.Context, req *pb.UpdateJobRequest) (*
obj.CurrentState = pb.JobState_JOB_STATE_CANCELLING
obj.CurrentStateTime = timestamppb.New(now)
obj.RequestedState = pb.JobState_JOB_STATE_CANCELLED
case pb.JobState_JOB_STATE_DRAINING:
obj.CurrentState = pb.JobState_JOB_STATE_DRAINING
obj.CurrentStateTime = timestamppb.New(now)
obj.RequestedState = pb.JobState_JOB_STATE_DRAINING
default:
return nil, status.Errorf(codes.InvalidArgument, "unhandled requestedState in mock")
return nil, status.Errorf(codes.InvalidArgument, "unhandled requestedState %v in mock", req.GetJob())
}

if err := r.storage.Update(ctx, fqn, obj); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions mockgcp/mockdataflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (r *MockService) StopJob(fqn string) error {
obj.CurrentState = pb.JobState_JOB_STATE_CANCELLED
obj.CurrentStateTime = timestamppb.New(now)
obj.RequestedState = pb.JobState_JOB_STATE_UNKNOWN
case pb.JobState_JOB_STATE_DRAINING:
obj.CurrentState = pb.JobState_JOB_STATE_DRAINED
obj.CurrentStateTime = timestamppb.New(now)
obj.RequestedState = pb.JobState_JOB_STATE_UNKNOWN
default:
return fmt.Errorf("unexpected state for job %q: %v", fqn, obj.CurrentState)
}
Expand Down
2 changes: 1 addition & 1 deletion mockgcp/mockpubsub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *MockService) ExpectedHosts() []string {

func (s *MockService) Register(grpcServer *grpc.Server) {
pb.RegisterPublisherServer(grpcServer, &publisherService{MockService: s})
// pb.RegisterSubscriberServer(grpcServer, &subscriberService{MockService: s})
pb.RegisterSubscriberServer(grpcServer, &subscriberService{MockService: s})
pb.RegisterSchemaServiceServer(grpcServer, &schemaService{MockService: s})
}

Expand Down
227 changes: 227 additions & 0 deletions mockgcp/mockpubsub/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mockpubsub

import (
"context"
"fmt"
"strings"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/klog/v2"

"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/projects"
pb "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/generated/mockgcp/pubsub/v1"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/pkg/storage"
)

type subscriberService struct {
*MockService
pb.UnimplementedSubscriberServer
}

func (s *subscriberService) CreateSubscription(ctx context.Context, req *pb.Subscription) (*pb.Subscription, error) {
name, err := s.parseSubscriptionName(req.Name)
if err != nil {
return nil, err
}
fqn := name.String()

obj := ProtoClone(req)
obj.Name = name.String()

obj.State = pb.Subscription_ACTIVE

s.populateDefaultsForSubscription(obj)

// Unlike many other APIs, creation is eventually consistent (not immediately visible in GET)
go func() {
ctx := context.Background()
time.Sleep(2 * time.Second)
if err := s.storage.Create(ctx, fqn, obj); err != nil {
klog.Errorf("error creaing pubsub Subscription: %v", err)
}
}()

return obj, nil
}

func (s *subscriberService) populateDefaultsForSubscription(obj *pb.Subscription) {
if obj.ExpirationPolicy == nil {
obj.ExpirationPolicy = &pb.ExpirationPolicy{
Ttl: &durationpb.Duration{
Seconds: 3600 * 24 * 31,
},
}
}

if obj.PushConfig == nil {
obj.PushConfig = &pb.PushConfig{}
}

}

func (s *subscriberService) UpdateSubscription(ctx context.Context, req *pb.UpdateSubscriptionRequest) (*pb.Subscription, error) {
reqName := req.Subscription.Name
name, err := s.parseSubscriptionName(reqName)
if err != nil {
return nil, err
}
fqn := name.String()
existing := &pb.Subscription{}
if err := s.storage.Get(ctx, fqn, existing); err != nil {
return nil, err
}

updated := ProtoClone(existing)
updated.Name = name.String()

// Required. The update mask applies to the resource.
paths := req.GetUpdateMask().GetPaths()
if len(paths) == 0 {
// Documented as required, but not passed by terraform...
// return nil, status.Errorf(codes.InvalidArgument, "update_mask is required")
updated.AckDeadlineSeconds = req.GetSubscription().AckDeadlineSeconds
updated.EnableExactlyOnceDelivery = req.GetSubscription().EnableExactlyOnceDelivery
updated.Labels = req.GetSubscription().Labels
updated.PushConfig = req.GetSubscription().PushConfig
}
// TODO: Some sort of helper for fieldmask?
for _, path := range paths {
switch path {
case "ackDeadlineSeconds":
updated.AckDeadlineSeconds = req.GetSubscription().AckDeadlineSeconds
case "enableExactlyOnceDelivery":
updated.EnableExactlyOnceDelivery = req.GetSubscription().EnableExactlyOnceDelivery
case "labels":
updated.Labels = req.GetSubscription().Labels
case "pushConfig":
updated.PushConfig = req.GetSubscription().PushConfig
// case "expirationPolicy":
// updated.ExpirationPolicy = req.GetSubscription().ExpirationPolicy
default:
return nil, status.Errorf(codes.InvalidArgument, "update_mask path %q not supported by mock", path)
}
}

s.populateDefaultsForSubscription(updated)

if err := s.storage.Update(ctx, fqn, updated); err != nil {
return nil, err
}

// Very unusual behaviour on the return value pushConfig ... maybe this is actually async also?
ret := ProtoClone(updated)
if ret.PushConfig == nil {
ret.PushConfig = &pb.PushConfig{}
}
if ret.PushConfig.Attributes == nil {
ret.PushConfig.Attributes = make(map[string]string)
}
if ret.PushConfig.Attributes["x-goog-version"] == "" {
ret.PushConfig.Attributes["x-goog-version"] = "v1"
}
return ret, nil
}

func (s *subscriberService) GetSubscription(ctx context.Context, req *pb.GetSubscriptionRequest) (*pb.Subscription, error) {
name, err := s.parseSubscriptionName(req.Subscription)
if err != nil {
return nil, err
}
fqn := name.String()
obj := &pb.Subscription{}
if err := s.storage.Get(ctx, fqn, obj); err != nil {
if status.Code(err) == codes.NotFound {
return nil, status.Errorf(codes.NotFound, "Resource not found (resource=%s).", name.ID)
}
return nil, err
}

return obj, nil
}

func (s *subscriberService) ListSubscriptions(ctx context.Context, req *pb.ListSubscriptionsRequest) (*pb.ListSubscriptionsResponse, error) {
project, err := s.Projects.GetProjectByID(req.Project)
if err != nil {
return nil, err
}

findPrefix := fmt.Sprintf("projects/%v/", project.ID)

var subscriptions []*pb.Subscription

subscriptionKind := (&pb.Subscription{}).ProtoReflect().Descriptor()
if err := s.storage.List(ctx, subscriptionKind, storage.ListOptions{}, func(obj proto.Message) error {
subscription := obj.(*pb.Subscription)
if strings.HasPrefix(subscription.Name, findPrefix) {
subscriptions = append(subscriptions, subscription)
}
return nil
}); err != nil {
return nil, err
}

return &pb.ListSubscriptionsResponse{
Subscriptions: subscriptions,
NextPageToken: "",
}, nil
}

func (s *subscriberService) DeleteSubscription(ctx context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error) {
name, err := s.parseSubscriptionName(req.Subscription)
if err != nil {
return nil, err
}
fqn := name.String()
deletedObj := &pb.Subscription{}
if err := s.storage.Delete(ctx, fqn, deletedObj); err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}

type subscriptionName struct {
Project *projects.ProjectData
ID string
}

func (n *subscriptionName) String() string {
return fmt.Sprintf("projects/%s/subscriptions/%s", n.Project.ID, n.ID)
}

// parseSubscriptionName parses a string into a subscriptionName.
// The expected form is `projects/*/subscriptions/*`.
func (s *MockService) parseSubscriptionName(name string) (*subscriptionName, error) {
tokens := strings.Split(name, "/")
if len(tokens) == 4 && tokens[0] == "projects" && tokens[2] == "subscriptions" {
project, err := s.Projects.GetProjectByID(tokens[1])
if err != nil {
return nil, err
}
name := &subscriptionName{
Project: project,
ID: tokens[3],
}
return name, nil
} else {
return nil, status.Errorf(codes.InvalidArgument, "name %q is not valid", name)
}
}
57 changes: 31 additions & 26 deletions mockgcp/mockpubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,35 +48,40 @@ func (s *publisherService) CreateTopic(ctx context.Context, req *pb.Topic) (*pb.
if err := s.storage.Create(ctx, fqn, obj); err != nil {
return nil, err
}

return obj, nil
}

func (s *publisherService) populateDefaultsForTopic(obj *pb.Topic) {
obj.MessageStoragePolicy = &pb.MessageStoragePolicy{
AllowedPersistenceRegions: []string{
"asia-east1",
"asia-northeast1",
"asia-southeast1",
"australia-southeast1",
"europe-north1",
"europe-west1",
"europe-west2",
"europe-west3",
"europe-west4",
"southamerica-west1",
"us-central1",
"us-central2",
"us-east1",
"us-east4",
"us-east5",
"us-east7",
"us-south1",
"us-west1",
"us-west2",
"us-west3",
"us-west4",
"us-west8",
},
// TODO: When _is_ this populated?
populateMessageStoragePolicy := false
if populateMessageStoragePolicy {
obj.MessageStoragePolicy = &pb.MessageStoragePolicy{
AllowedPersistenceRegions: []string{
"asia-east1",
"asia-northeast1",
"asia-southeast1",
"australia-southeast1",
"europe-north1",
"europe-west1",
"europe-west2",
"europe-west3",
"europe-west4",
"southamerica-west1",
"us-central1",
"us-central2",
"us-east1",
"us-east4",
"us-east5",
"us-east7",
"us-south1",
"us-west1",
"us-west2",
"us-west3",
"us-west4",
"us-west8",
},
}
}
}

Expand All @@ -92,7 +97,7 @@ func (s *publisherService) UpdateTopic(ctx context.Context, req *pb.UpdateTopicR
return nil, err
}

updated := req.GetTopic()
updated := ProtoClone(existing)
updated.Name = name.String()

// Required. The update mask applies to the resource.
Expand Down
22 changes: 22 additions & 0 deletions mockgcp/mockpubsub/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mockpubsub

import "google.golang.org/protobuf/proto"

// ProtoClone is a type-safe wrapper around proto.Clone
func ProtoClone[T proto.Message](t T) T {
return proto.Clone(t).(T)
}
Loading

0 comments on commit 396428b

Please sign in to comment.