-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* istio-optional: iteration 2 * fix tests * indexer tests GatewayAPI before create * fix rebase issues * fix tests * fix tests after code refactor
- Loading branch information
Showing
15 changed files
with
505 additions
and
433 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ import ( | |
istioclientgoextensionv1alpha1 "istio.io/client-go/pkg/apis/extensions/v1alpha1" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/utils/ptr" | ||
"k8s.io/utils/env" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/handler" | ||
|
@@ -45,12 +45,16 @@ import ( | |
"github.com/kuadrant/kuadrant-operator/pkg/rlptools/wasm" | ||
) | ||
|
||
const ( | ||
HTTPRouteGatewayParentField = ".metadata.parentRefs.gateway" | ||
var ( | ||
WASMFilterImageURL = env.GetString("RELATED_IMAGE_WASMSHIM", "oci://quay.io/kuadrant/wasm-shim:latest") | ||
) | ||
|
||
// RateLimitingWASMPluginReconciler reconciles a WASMPlugin object for rate limiting | ||
type RateLimitingWASMPluginReconciler struct { | ||
func WASMPluginName(gw *gatewayapiv1.Gateway) string { | ||
return fmt.Sprintf("kuadrant-%s", gw.Name) | ||
} | ||
|
||
// RateLimitingIstioWASMPluginReconciler reconciles a WASMPlugin object for rate limiting | ||
type RateLimitingIstioWASMPluginReconciler struct { | ||
*reconcilers.BaseReconciler | ||
} | ||
|
||
|
@@ -61,7 +65,7 @@ type RateLimitingWASMPluginReconciler struct { | |
|
||
// For more details, check Reconcile and its Result here: | ||
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile | ||
func (r *RateLimitingWASMPluginReconciler) Reconcile(eventCtx context.Context, req ctrl.Request) (ctrl.Result, error) { | ||
func (r *RateLimitingIstioWASMPluginReconciler) Reconcile(eventCtx context.Context, req ctrl.Request) (ctrl.Result, error) { | ||
logger := r.Logger().WithValues("Gateway", req.NamespacedName, "request id", uuid.NewString()) | ||
logger.Info("Reconciling rate limiting WASMPlugin") | ||
ctx := logr.NewContext(eventCtx, logger) | ||
|
@@ -89,7 +93,7 @@ func (r *RateLimitingWASMPluginReconciler) Reconcile(eventCtx context.Context, r | |
return ctrl.Result{}, err | ||
} | ||
|
||
err = r.ReconcileResource(ctx, &istioclientgoextensionv1alpha1.WasmPlugin{}, desired, rlptools.WASMPluginMutator) | ||
err = r.ReconcileResource(ctx, &istioclientgoextensionv1alpha1.WasmPlugin{}, desired, kuadrantistioutils.WASMPluginMutator) | ||
if err != nil { | ||
return ctrl.Result{}, err | ||
} | ||
|
@@ -98,7 +102,7 @@ func (r *RateLimitingWASMPluginReconciler) Reconcile(eventCtx context.Context, r | |
return ctrl.Result{}, nil | ||
} | ||
|
||
func (r *RateLimitingWASMPluginReconciler) desiredRateLimitingWASMPlugin(ctx context.Context, gw *gatewayapiv1.Gateway) (*istioclientgoextensionv1alpha1.WasmPlugin, error) { | ||
func (r *RateLimitingIstioWASMPluginReconciler) desiredRateLimitingWASMPlugin(ctx context.Context, gw *gatewayapiv1.Gateway) (*istioclientgoextensionv1alpha1.WasmPlugin, error) { | ||
baseLogger, err := logr.FromContext(ctx) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -110,12 +114,12 @@ func (r *RateLimitingWASMPluginReconciler) desiredRateLimitingWASMPlugin(ctx con | |
APIVersion: "extensions.istio.io/v1alpha1", | ||
}, | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: rlptools.WASMPluginName(gw), | ||
Name: WASMPluginName(gw), | ||
Namespace: gw.Namespace, | ||
}, | ||
Spec: istioextensionsv1alpha1.WasmPlugin{ | ||
TargetRef: kuadrantistioutils.PolicyTargetRefFromGateway(gw), | ||
Url: rlptools.WASMFilterImageURL, | ||
Url: WASMFilterImageURL, | ||
PluginConfig: nil, | ||
// Insert plugin before Istio stats filters and after Istio authorization filters. | ||
Phase: istioextensionsv1alpha1.PluginPhase_STATS, | ||
|
@@ -150,18 +154,18 @@ func (r *RateLimitingWASMPluginReconciler) desiredRateLimitingWASMPlugin(ctx con | |
return wasmPlugin, nil | ||
} | ||
|
||
func (r *RateLimitingWASMPluginReconciler) wasmPluginConfig(ctx context.Context, gw *gatewayapiv1.Gateway) (*wasm.Plugin, error) { | ||
func (r *RateLimitingIstioWASMPluginReconciler) wasmPluginConfig(ctx context.Context, gw *gatewayapiv1.Gateway) (*wasm.Config, error) { | ||
logger, err := logr.FromContext(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
wasmPlugin := &wasm.Plugin{ | ||
config := &wasm.Config{ | ||
FailureMode: wasm.FailureModeDeny, | ||
RateLimitPolicies: make([]wasm.RateLimitPolicy, 0), | ||
} | ||
|
||
t, err := r.topologyIndexesFromGateway(ctx, gw) | ||
t, err := rlptools.TopologyIndexesFromGateway(ctx, r.Client(), gw) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -185,60 +189,13 @@ func (r *RateLimitingWASMPluginReconciler) wasmPluginConfig(ctx context.Context, | |
continue | ||
} | ||
|
||
wasmPlugin.RateLimitPolicies = append(wasmPlugin.RateLimitPolicies, *wasmRLP) | ||
} | ||
|
||
return wasmPlugin, nil | ||
} | ||
|
||
func (r *RateLimitingWASMPluginReconciler) topologyIndexesFromGateway(ctx context.Context, gw *gatewayapiv1.Gateway) (*kuadrantgatewayapi.TopologyIndexes, error) { | ||
logger, err := logr.FromContext(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
routeList := &gatewayapiv1.HTTPRouteList{} | ||
// Get all the routes having the gateway as parent | ||
err = r.Client().List(ctx, routeList, client.MatchingFields{HTTPRouteGatewayParentField: client.ObjectKeyFromObject(gw).String()}) | ||
logger.V(1).Info("topologyIndexesFromGateway: list httproutes from gateway", | ||
"gateway", client.ObjectKeyFromObject(gw), | ||
"#HTTPRoutes", len(routeList.Items), | ||
"err", err) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
rlpList := &kuadrantv1beta2.RateLimitPolicyList{} | ||
// Get all the rate limit policies | ||
err = r.Client().List(ctx, rlpList) | ||
logger.V(1).Info("topologyIndexesFromGateway: list rate limit policies", | ||
"#RLPS", len(rlpList.Items), | ||
"err", err) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
policies := utils.Map(rlpList.Items, func(p kuadrantv1beta2.RateLimitPolicy) kuadrantgatewayapi.Policy { return &p }) | ||
|
||
topology, err := kuadrantgatewayapi.NewTopology( | ||
kuadrantgatewayapi.WithGateways([]*gatewayapiv1.Gateway{gw}), | ||
kuadrantgatewayapi.WithRoutes(utils.Map(routeList.Items, ptr.To[gatewayapiv1.HTTPRoute])), | ||
kuadrantgatewayapi.WithPolicies(policies), | ||
kuadrantgatewayapi.WithLogger(logger), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
overriddenTopology, err := rlptools.ApplyOverrides(topology, gw) | ||
if err != nil { | ||
return nil, err | ||
config.RateLimitPolicies = append(config.RateLimitPolicies, *wasmRLP) | ||
} | ||
|
||
return kuadrantgatewayapi.NewTopologyIndexes(overriddenTopology), nil | ||
return config, nil | ||
} | ||
|
||
func (r *RateLimitingWASMPluginReconciler) wasmRateLimitPolicy(ctx context.Context, t *kuadrantgatewayapi.TopologyIndexes, rlp *kuadrantv1beta2.RateLimitPolicy, gw *gatewayapiv1.Gateway) (*wasm.RateLimitPolicy, error) { | ||
func (r *RateLimitingIstioWASMPluginReconciler) wasmRateLimitPolicy(ctx context.Context, t *kuadrantgatewayapi.TopologyIndexes, rlp *kuadrantv1beta2.RateLimitPolicy, gw *gatewayapiv1.Gateway) (*wasm.RateLimitPolicy, error) { | ||
route, err := r.routeFromRLP(ctx, t, rlp, gw) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -271,7 +228,7 @@ func (r *RateLimitingWASMPluginReconciler) wasmRateLimitPolicy(ctx context.Conte | |
routeWithEffectiveHostnames := route.DeepCopy() | ||
routeWithEffectiveHostnames.Spec.Hostnames = hostnames | ||
|
||
rules := rlptools.WasmRules(rlp, routeWithEffectiveHostnames) | ||
rules := wasm.Rules(rlp, routeWithEffectiveHostnames) | ||
if len(rules) == 0 { | ||
// no need to add the policy if there are no rules; a rlp can return no rules if all its limits fail to match any route rule | ||
return nil, nil | ||
|
@@ -286,7 +243,7 @@ func (r *RateLimitingWASMPluginReconciler) wasmRateLimitPolicy(ctx context.Conte | |
}, nil | ||
} | ||
|
||
func (r *RateLimitingWASMPluginReconciler) routeFromRLP(ctx context.Context, t *kuadrantgatewayapi.TopologyIndexes, rlp *kuadrantv1beta2.RateLimitPolicy, gw *gatewayapiv1.Gateway) (*gatewayapiv1.HTTPRoute, error) { | ||
func (r *RateLimitingIstioWASMPluginReconciler) routeFromRLP(ctx context.Context, t *kuadrantgatewayapi.TopologyIndexes, rlp *kuadrantv1beta2.RateLimitPolicy, gw *gatewayapiv1.Gateway) (*gatewayapiv1.HTTPRoute, error) { | ||
logger, err := logr.FromContext(ctx) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -325,34 +282,8 @@ func (r *RateLimitingWASMPluginReconciler) routeFromRLP(ctx context.Context, t * | |
return route, nil | ||
} | ||
|
||
// addHTTPRouteByGatewayIndexer declares an index key that we can later use with the client as a pseudo-field name, | ||
// allowing to query all the routes parented by a given gateway | ||
// to prevent creating the same index field multiple times, the function is declared private to be | ||
// called only by this controller | ||
func addHTTPRouteByGatewayIndexer(mgr ctrl.Manager, baseLogger logr.Logger) error { | ||
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &gatewayapiv1.HTTPRoute{}, HTTPRouteGatewayParentField, func(rawObj client.Object) []string { | ||
// grab the route object, extract the parents | ||
route, assertionOk := rawObj.(*gatewayapiv1.HTTPRoute) | ||
if !assertionOk { | ||
baseLogger.V(1).Error(fmt.Errorf("%T is not a *gatewayapiv1.HTTPRoute", rawObj), "cannot map") | ||
return nil | ||
} | ||
|
||
logger := baseLogger.WithValues("route", client.ObjectKeyFromObject(route).String()) | ||
|
||
return utils.Map(kuadrantgatewayapi.GetRouteAcceptedGatewayParentKeys(route), func(key client.ObjectKey) string { | ||
logger.V(1).Info("new gateway added", "key", key.String()) | ||
return key.String() | ||
}) | ||
}); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// SetupWithManager sets up the controller with the Manager. | ||
func (r *RateLimitingWASMPluginReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
func (r *RateLimitingIstioWASMPluginReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
ok, err := kuadrantistioutils.IsWASMPluginInstalled(mgr.GetRESTMapper()) | ||
if err != nil { | ||
return err | ||
|
@@ -371,12 +302,6 @@ func (r *RateLimitingWASMPluginReconciler) SetupWithManager(mgr ctrl.Manager) er | |
return nil | ||
} | ||
|
||
// Add custom indexer | ||
err = addHTTPRouteByGatewayIndexer(mgr, r.Logger().WithName("routeByGatewayIndexer")) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
httpRouteToParentGatewaysEventMapper := mappers.NewHTTPRouteToParentGatewaysEventMapper( | ||
mappers.WithLogger(r.Logger().WithName("httpRouteToParentGatewaysEventMapper")), | ||
) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.