Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mesh: add computed destinations with a controller that computes them #19067

Merged
merged 10 commits into from
Oct 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,37 @@ package explicitdestinations

import (
"context"
"fmt"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/mappers/workloadselectionmapper"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/explicitdestinations/mapper"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)

const ControllerName = "consul.io/explicit-destinations-controller"
const ControllerName = "consul.io/explicit-mapper-controller"

func Controller(destinationsMapper *workloadselectionmapper.Mapper[*pbmesh.Destinations]) controller.Controller {
if destinationsMapper == nil {
panic("destinations mapper is required")
func Controller(mapper *mapper.Mapper) controller.Controller {
if mapper == nil {
panic("mapper is required")
}

return controller.ForType(pbmesh.ComputedExplicitDestinationsType).
WithWatch(pbmesh.DestinationsType, destinationsMapper.MapToComputedType).
WithWatch(pbmesh.DestinationsType, mapper.MapDestinations).
WithWatch(pbcatalog.WorkloadType, controller.ReplaceType(pbmesh.ComputedExplicitDestinationsType)).
WithReconciler(&reconciler{destinations: destinationsMapper})
WithWatch(pbcatalog.ServiceType, mapper.MapService).
WithWatch(pbmesh.ComputedRoutesType, mapper.MapComputedRoute).
WithReconciler(&reconciler{mapper: mapper})
}

type reconciler struct {
destinations *workloadselectionmapper.Mapper[*pbmesh.Destinations]
mapper *mapper.Mapper
}

func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
Expand All @@ -52,22 +55,24 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
// because it is owned by the workload. In this case, we skip reconcile
// because there's nothing for us to do.
rt.Logger.Trace("the corresponding workload does not exist", "id", workloadID)
r.mapper.UntrackComputedExplicitDestinations(req.ID)
return nil
}

// Get existing ComputedExplicitDestinations resource (if any).
ced, err := resource.GetDecodedResource[*pbmesh.ComputedExplicitDestinations](ctx, rt.Client, req.ID)
if err != nil {
rt.Logger.Error("error fetching ComputedDestinations", "error", err)
rt.Logger.Error("error fetching ComputedExplicitDestinations", "error", err)
return err
}

// If workload is not on the mesh, we need to delete the resource and return
// as for non-mesh workloads there should be no destinations.
// as for non-mesh workloads there should be no mapper.
if !workload.GetData().IsMeshEnabled() {
rt.Logger.Trace("workload is not on the mesh, skipping reconcile and deleting any corresponding ComputedDestinations", "id", workloadID)
r.mapper.UntrackComputedExplicitDestinations(req.ID)

// Delete CD only if it exists.
// Delete CED only if it exists.
if ced != nil {
_, err = rt.Client.Delete(ctx, &pbresource.DeleteRequest{Id: req.ID})
if err != nil {
Expand All @@ -81,31 +86,50 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
return nil
}

// Now get any destinations that we have in the cache that have selectors matching the name
// Now get any mapper that we have in the cache that have selectors matching the name
// of this CD (name-aligned with workload).
destinationIDs := r.destinations.IDsForWorkload(req.ID)
destinationIDs := r.mapper.DestinationsForWorkload(req.ID)
rt.Logger.Trace("cached destinations IDs", "ids", destinationIDs)

decodedDestinations, err := r.fetchDestinations(ctx, rt.Client, destinationIDs)
if err != nil {
rt.Logger.Error("error fetching destinations", "error", err)
rt.Logger.Error("error fetching mapper", "error", err)
return err
}

if len(decodedDestinations) > 0 {
r.mapper.TrackDestinations(req.ID, decodedDestinations)
} else {
r.mapper.UntrackComputedExplicitDestinations(req.ID)
}

duplicates := findConflicts(decodedDestinations)
ishustava marked this conversation as resolved.
Show resolved Hide resolved

newComputedDestinationsData := &pbmesh.ComputedExplicitDestinations{}
for _, dst := range decodedDestinations {
valid, cond := validate(ctx, rt.Client, dst)
updatedStatus := &pbresource.Status{
ObservedGeneration: dst.GetResource().GetGeneration(),
}

// Only add it to computed destinations if its destinations are valid.
if valid {
newComputedDestinationsData.Destinations = append(newComputedDestinationsData.Destinations, dst.GetData().GetDestinations()...)
// First check if this resource has a conflict. If it does, update status and move to the next resource.
if _, ok := duplicates[resource.NewReferenceKey(dst.GetResource().GetId())]; ok {
rt.Logger.Trace("skipping this Destinations resource because it has conflicts with others", "id", dst.GetResource().GetId())
updatedStatus.Conditions = append(updatedStatus.Conditions, ConditionConflictFound(workload.GetResource().GetId()))
} else {
valid, cond := validate(ctx, rt.Client, dst)

// Only add it to computed mapper if its mapper are valid.
if valid {
newComputedDestinationsData.Destinations = append(newComputedDestinationsData.Destinations, dst.GetData().GetDestinations()...)
} else {
rt.Logger.Trace("Destinations is not valid", "condition", cond)
}

updatedStatus.Conditions = append(updatedStatus.Conditions, ConditionConflictNotFound, cond)
}

// Write status for this destination.
currentStatus := dst.GetResource().GetStatus()[ControllerName]
updatedStatus := &pbresource.Status{
Conditions: []*pbresource.Condition{cond},
ObservedGeneration: dst.GetResource().GetGeneration(),
}

// If the status is unchanged then we should return and avoid the unnecessary write
if !resource.EqualStatus(currentStatus, updatedStatus, false) {
Expand All @@ -132,7 +156,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
_, err = rt.Client.Delete(ctx, &pbresource.DeleteRequest{Id: req.ID})
if err != nil {
// If there's an error deleting CD, we want to re-trigger reconcile again.
rt.Logger.Error("error deleting ComputedDestinations", "error", err)
rt.Logger.Error("error deleting ComputedExplicitDestinations", "error", err)
return err
}
}
Expand All @@ -142,12 +166,12 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c

ishustava marked this conversation as resolved.
Show resolved Hide resolved
// Lastly, write the resource.
if ced == nil || !proto.Equal(ced.GetData(), newComputedDestinationsData) {
rt.Logger.Trace("writing new ComputedDestinations")
rt.Logger.Trace("writing new ComputedExplicitDestinations")

// First encode the endpoints data as an Any type.
cpcDataAsAny, err := anypb.New(newComputedDestinationsData)
if err != nil {
rt.Logger.Error("error marshalling latest ComputedDestinations", "error", err)
rt.Logger.Error("error marshalling latest ComputedExplicitDestinations", "error", err)
return err
}

Expand All @@ -159,7 +183,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
},
})
if err != nil {
rt.Logger.Error("error writing ComputedDestinations", "error", err)
rt.Logger.Error("error writing ComputedExplicitDestinations", "error", err)
return err
}
}
Expand Down Expand Up @@ -187,7 +211,8 @@ func validate(
return false, ConditionMeshProtocolNotFound(serviceRef)
}

if service.GetData().FindServicePort(dest.DestinationPort).Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
if service.GetData().FindServicePort(dest.DestinationPort) != nil &&
service.GetData().FindServicePort(dest.DestinationPort).Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
return false, ConditionMeshProtocolDestinationPort(serviceRef, dest.DestinationPort)
}

Expand Down Expand Up @@ -225,11 +250,51 @@ func (r *reconciler) fetchDestinations(
}
if res == nil || res.GetResource() == nil || res.GetData() == nil {
// If resource is not found, we should untrack it.
r.destinations.UntrackID(id)
r.mapper.UntrackDestinations(id)
continue
}
decoded = append(decoded, res)
}

return decoded, nil
}

// Find conflicts finds any resources where listen addresses of the destinations are conflicting.
// It will record both resources as conflicting in the resulting map.
func findConflicts(destinations []*types.DecodedDestinations) map[resource.ReferenceKey]struct{} {
addresses := make(map[string]*pbresource.ID)
duplicates := make(map[resource.ReferenceKey]struct{})

for _, decDestinations := range destinations {
for _, dst := range decDestinations.GetData().GetDestinations() {
var address string

switch dst.ListenAddr.(type) {
case *pbmesh.Destination_IpPort:
listenAddr := dst.GetListenAddr().(*pbmesh.Destination_IpPort)
address = fmt.Sprintf("%s:%d", listenAddr.IpPort.GetIp(), listenAddr.IpPort.GetPort())
case *pbmesh.Destination_Unix:
listenAddr := dst.GetListenAddr().(*pbmesh.Destination_Unix)
address = listenAddr.Unix.GetPath()
default:
continue
}

if id, ok := addresses[address]; ok {
// if there's already a listen address for one of the mapper, that means we've found a duplicate.
duplicates[resource.NewReferenceKey(decDestinations.GetResource().GetId())] = struct{}{}

// Also record the original resource as conflicting one.
duplicates[resource.NewReferenceKey(id)] = struct{}{}

// Don't evaluate the rest of mapper in this resource because this resource already has a duplicate.
break
} else {
// Otherwise, record this address.
addresses[address] = decDestinations.GetResource().GetId()
}
}
}

return duplicates
}
Loading