diff --git a/README.md b/README.md index abc6cf5..71b81ef 100644 --- a/README.md +++ b/README.md @@ -62,21 +62,24 @@ resources: Some configuration parameters can be defined by flags that can be passed to the controller. They are described in the following table: -| Name | Description | Default | -|:-------------------------------|:-------------------------------------------------------------------------------|:----------------------:| -| `--metrics-bind-address` | The address the metric endpoint binds to.
0 disables the server | `0` | -| `--health-probe-bind-address` | he address the probe endpoint binds to | `:8081` | -| `--leader-elect` | Enable leader election for controller manager | `false` | -| `--metrics-secure` | If set the metrics endpoint is served securely | `false` | -| `--enable-http2` | If set, HTTP/2 will be enabled for the metrirs | `false` | -| `--webhook-client-hostname` | The hostname used by Kubernetes when calling the webhooks server | `webhooks.admitik.svc` | -| `--webhook-client-port` | The port used by Kubernetes when calling the webhooks server | `10250` | -| `--webhook-client-timeout` | The seconds until timout waited by Kubernetes when calling the webhooks server | `10` | -| `--webhook-server-port` | The port where the webhooks server listens | `10250` | -| `--webhook-server-path` | The path where the webhooks server listens | `/validate` | -| `--webhook-server-ca` | The CA bundle to use for the webhooks server | `-` | -| `--webhook-server-certificate` | The Certificate used by webhooks server | `-` | -| `--webhook-server-private-key` | The Private Key used by webhooks server | `-` | +| Name | Description | Default | +|:---------------------------------------|:-------------------------------------------------------------------------------|:----------------------:| +| `--metrics-bind-address` | The address the metric endpoint binds to.
0 disables the server | `0` | +| `--health-probe-bind-address` | he address the probe endpoint binds to | `:8081` | +| `--leader-elect` | Enable leader election for controller manager | `false` | +| `--metrics-secure` | If set the metrics endpoint is served securely | `false` | +| `--enable-http2` | If set, HTTP/2 will be enabled for the metrirs | `false` | +| `--sources-time-to-resync-informers` | Interval to resynchronize all resources in the informers | `60s` | +| `--sources-time-to-reconcile-watchers` | Time between each reconciliation loop of the watchers | `10s` | +| `--sources-time-to-ack-watcher` | Wait time before marking a watcher as acknowledged (ACK) after it starts | `2s` | +| `--webhook-client-hostname` | The hostname used by Kubernetes when calling the webhooks server | `webhooks.admitik.svc` | +| `--webhook-client-port` | The port used by Kubernetes when calling the webhooks server | `10250` | +| `--webhook-client-timeout` | The seconds until timout waited by Kubernetes when calling the webhooks server | `10` | +| `--webhook-server-port` | The port where the webhooks server listens | `10250` | +| `--webhook-server-path` | The path where the webhooks server listens | `/validate` | +| `--webhook-server-ca` | The CA bundle to use for the webhooks server | `-` | +| `--webhook-server-certificate` | The Certificate used by webhooks server | `-` | +| `--webhook-server-private-key` | The Private Key used by webhooks server | `-` | ## Examples diff --git a/cmd/main.go b/cmd/main.go index 3d38d3f..fae924a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "strings" + "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -41,10 +42,11 @@ import ( // "freepik.com/admitik/api/v1alpha1" + "freepik.com/admitik/internal/admission" "freepik.com/admitik/internal/certificates" "freepik.com/admitik/internal/controller" "freepik.com/admitik/internal/globals" - "freepik.com/admitik/internal/xyz" + "freepik.com/admitik/internal/sources" // +kubebuilder:scaffold:imports ) @@ -70,6 +72,10 @@ func main() { var enableHTTP2 bool // Custom flags from here + var sourcesTimeToResyncInformers time.Duration + var sourcesTimeToReconcileWatchers time.Duration + var sourcesTimeToAckWatcher time.Duration + var webhooksClientHostname string var webhooksClientPort int var webhooksClientTimeout int @@ -95,6 +101,13 @@ func main() { "If set, HTTP/2 will be enabled for the metrics and webhook servers") // Custom flags from here + flag.DurationVar(&sourcesTimeToResyncInformers, "sources-time-to-resync-informers", 60*time.Second, + "Interval to resynchronize all resources in the informers") + flag.DurationVar(&sourcesTimeToReconcileWatchers, "sources-time-to-reconcile-watchers", 10*time.Second, + "Time between each reconciliation loop of the watchers") + flag.DurationVar(&sourcesTimeToAckWatcher, "sources-time-to-ack-watcher", 2*time.Second, + "Wait time before marking a watcher as acknowledged (ACK) after it starts") + flag.StringVar(&webhooksClientHostname, "webhook-client-hostname", "webhooks.admitik.svc", "The hostname used by Kubernetes when calling the webhooks server") flag.IntVar(&webhooksClientPort, "webhook-client-port", 10250, @@ -318,6 +331,22 @@ func main() { os.Exit(1) } + // Init SourcesController. + // This controller is in charge of launching watchers to cache sources expressed in some CRs in background. + // This way we avoid retrieving them from Kubernetes on each request to the Admission/Mutation controllers. + sourcesController := sources.SourcesController{ + Client: globals.Application.KubeRawClient, + Options: sources.SourcesControllerOptions{ + InformerDurationToResync: sourcesTimeToResyncInformers, + WatchersDurationBetweenReconcileLoops: sourcesTimeToReconcileWatchers, + WatcherDurationToAck: sourcesTimeToAckWatcher, + }, + } + + setupLog.Info("starting sources controller") + globals.Application.SourceController = &sourcesController + go sourcesController.Start(globals.Application.Context) + // Init primary controller // ATTENTION: This controller may be replaced by a custom one in the future doing the same tasks // to simplify this project's dependencies and maintainability @@ -344,9 +373,9 @@ func main() { } // Init secondary controller to process coming events - workloadController := xyz.WorkloadController{ + admissionController := admission.AdmissionController{ Client: mgr.GetClient(), - Options: xyz.WorkloadControllerOptions{ + Options: admission.AdmissionControllerOptions{ // ServerAddr: "0.0.0.0", @@ -359,8 +388,8 @@ func main() { }, } - setupLog.Info("starting workload controller") - go workloadController.Start(globals.Application.Context) + setupLog.Info("starting admission controller") + go admissionController.Start(globals.Application.Context) // setupLog.Info("starting manager") diff --git a/internal/xyz/workload_controller.go b/internal/admission/controller.go similarity index 78% rename from internal/xyz/workload_controller.go rename to internal/admission/controller.go index ebe6c30..6c6c782 100644 --- a/internal/xyz/workload_controller.go +++ b/internal/admission/controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package xyz +package admission import ( "context" @@ -32,12 +32,12 @@ import ( const ( // - controllerContextFinishedMessage = "xyz.WorkloadController finished by context" + controllerContextFinishedMessage = "admission.AdmissionController finished by context" ) -// WorkloadControllerOptions represents available options that can be passed -// to WorkloadController on start -type WorkloadControllerOptions struct { +// AdmissionControllerOptions represents available options that can be passed +// to AdmissionController on start +type AdmissionControllerOptions struct { // ServerAddr string ServerPort int @@ -48,19 +48,19 @@ type WorkloadControllerOptions struct { TLSPrivateKey string } -// WorkloadController represents the controller that triggers parallel threads. +// AdmissionController represents the controller that triggers parallel threads. // These threads process coming events against the conditions defined in Notification CRs // Each thread is a watcher in charge of a group of resources GVRNN (Group + Version + Resource + Namespace + Name) -type WorkloadController struct { +type AdmissionController struct { Client client.Client // - Options WorkloadControllerOptions + Options AdmissionControllerOptions } -// Start launches the XYZ.WorkloadController and keeps it alive +// Start launches the AdmissionController and keeps it alive // It kills the controller on application context death, and rerun the process when failed -func (r *WorkloadController) Start(ctx context.Context) { +func (r *AdmissionController) Start(ctx context.Context) { logger := log.FromContext(ctx) for { @@ -79,7 +79,7 @@ func (r *WorkloadController) Start(ctx context.Context) { } // runWebserver prepares and runs the HTTP server -func (r *WorkloadController) runWebserver() (err error) { +func (r *AdmissionController) runWebserver() (err error) { customServer := NewHttpServer() diff --git a/internal/xyz/server.go b/internal/admission/server.go similarity index 86% rename from internal/xyz/server.go rename to internal/admission/server.go index 5ca49ae..6e9e7ae 100644 --- a/internal/xyz/server.go +++ b/internal/admission/server.go @@ -1,4 +1,20 @@ -package xyz +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission import ( "context" @@ -9,16 +25,17 @@ import ( "slices" "time" + // "freepik.com/admitik/api/v1alpha1" "freepik.com/admitik/internal/globals" "freepik.com/admitik/internal/template" + // admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -149,7 +166,7 @@ func (s *HttpServer) handleRequest(response http.ResponseWriter, request *http.R // Retrieve the sources declared per policy for sourceIndex, sourceItem := range caPolicyObj.Spec.Sources { - unstructuredSourceObjList, err := getKubeResourceList(request.Context(), + unstructuredSourceObjList, err := getSourcesFromPool( sourceItem.Group, sourceItem.Version, sourceItem.Resource, @@ -167,8 +184,8 @@ func (s *HttpServer) handleRequest(response http.ResponseWriter, request *http.R tmpSources = make(map[int][]map[string]interface{}) } - for _, unstructuredItem := range unstructuredSourceObjList.Items { - tmpSources[sourceIndex] = append(tmpSources[sourceIndex], unstructuredItem.Object) + for _, unstructuredItem := range unstructuredSourceObjList { + tmpSources[sourceIndex] = append(tmpSources[sourceIndex], (*unstructuredItem).Object) } specificTemplateInjectedObject["sources"] = tmpSources @@ -221,34 +238,16 @@ func (s *HttpServer) handleRequest(response http.ResponseWriter, request *http.R reviewResponse.Response.Result = &metav1.Status{} } -// getKubeResourceList returns an unstructuredList of resources selected by params -func getKubeResourceList(ctx context.Context, group, version, resource, namespace, name string) ( - resourceList *unstructured.UnstructuredList, err error) { +// getSourcesFromPool returns a list of unstructured resources selected by params from the sources cache +func getSourcesFromPool(group, version, resource, namespace, name string) ( + resourceList []*unstructured.Unstructured, err error) { - unstructuredSourceObj := globals.Application.KubeRawClient.Resource(schema.GroupVersionResource{ - Group: group, - Version: version, - Resource: resource, - }) - - sourceListOptions := metav1.ListOptions{} - - if namespace != "" { - sourceListOptions.FieldSelector = fmt.Sprintf("metadata.namespace=%s", namespace) - } - - if name != "" { - if sourceListOptions.FieldSelector != "" { - sourceListOptions.FieldSelector += "," - } - sourceListOptions.FieldSelector = fmt.Sprintf("metadata.name=%s", name) - } + sourceString := fmt.Sprintf("%s/%s/%s/%s/%s", group, version, resource, namespace, name) - resourceList, err = unstructuredSourceObj.List(ctx, sourceListOptions) - return resourceList, err + return globals.Application.SourceController.GetWatcherResources(sourceString) } -// createKubeEvent TODO +// createKubeEvent creates a modern event in Kuvernetes with data given by params func createKubeEvent(ctx context.Context, namespace string, object map[string]interface{}, policy v1alpha1.ClusterAdmissionPolicy, action, message string) (err error) { diff --git a/internal/xyz/utils.go b/internal/admission/utils.go similarity index 52% rename from internal/xyz/utils.go rename to internal/admission/utils.go index f1eb4dc..4c9907e 100644 --- a/internal/xyz/utils.go +++ b/internal/admission/utils.go @@ -1,4 +1,20 @@ -package xyz +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission import ( "errors" diff --git a/internal/certificates/certificates.go b/internal/certificates/certificates.go index d53c55e..f70eda3 100644 --- a/internal/certificates/certificates.go +++ b/internal/certificates/certificates.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package certificates import ( diff --git a/internal/controller/clusteradmissionpolicy_status.go b/internal/controller/clusteradmissionpolicy_status.go index e62bd68..01e82dd 100644 --- a/internal/controller/clusteradmissionpolicy_status.go +++ b/internal/controller/clusteradmissionpolicy_status.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package controller import ( diff --git a/internal/controller/clusteradmissionpolicy_sync.go b/internal/controller/clusteradmissionpolicy_sync.go index 938567b..a9b5fd5 100644 --- a/internal/controller/clusteradmissionpolicy_sync.go +++ b/internal/controller/clusteradmissionpolicy_sync.go @@ -1,3 +1,21 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// TODO: Decouple this controller from 'globals' package + package controller import ( @@ -6,6 +24,7 @@ import ( "slices" "strings" + // "freepik.com/admitik/api/v1alpha1" "freepik.com/admitik/internal/globals" @@ -22,7 +41,8 @@ const ( ) var ( - AdmissionOperations = []admissionregv1.OperationType{admissionregv1.Create, admissionregv1.Update, admissionregv1.Delete, admissionregv1.Connect} + AdmissionOperations = []admissionregv1.OperationType{ + admissionregv1.Create, admissionregv1.Update, admissionregv1.Delete, admissionregv1.Connect} // ValidatingWebhookConfigurationRuleScopeAll = admissionregv1.ScopeType("*") @@ -108,6 +128,48 @@ func (r *ClusterAdmissionPolicyReconciler) SyncAdmissionPool(ctx context.Context } } + // Craft ValidatingWebhookConfiguration rules based on the previous existing one and current pool keys + metaWebhookObj, err := r.getMergedValidatingWebhookConfiguration(ctx) + if err != nil { + err = fmt.Errorf("error building ValidatingWebhookConfiguration '%s': %s", + ValidatingWebhookConfigurationName, err.Error()) + return + } + + // Sync changes to Kubernetes + if errors.IsNotFound(err) { + err = r.Create(ctx, &metaWebhookObj) + if err != nil { + err = fmt.Errorf("error creating ValidatingWebhookConfiguration in Kubernetes'%s': %s", + ValidatingWebhookConfigurationName, err.Error()) + return + } + } else { + err = r.Update(ctx, &metaWebhookObj) + if err != nil { + err = fmt.Errorf("error updating ValidatingWebhookConfiguration in Kubernetes '%s': %s", + ValidatingWebhookConfigurationName, err.Error()) + return + } + } + + // Ask SourcesController to watch all the sources + watchersList := r.getAllSourcesReferences() + + err = globals.Application.SourceController.SyncWatchers(watchersList) + if err != nil { + err = fmt.Errorf("error syncing watchers: %s", err.Error()) + return + } + + return nil +} + +// getValidatingWebhookConfiguration return a ValidatingWebhookConfiguration object that is built based on +// previous existing one in Kubernetes and the current pool keys extracted from ClusterAdmissionPolicy.spec.watchedResources +func (r *ClusterAdmissionPolicyReconciler) getMergedValidatingWebhookConfiguration(ctx context.Context) ( + vwConfig admissionregv1.ValidatingWebhookConfiguration, err error) { + // Craft ValidatingWebhookConfiguration rules based on the pool keys currentVwcRules := []admissionregv1.RuleWithOperations{} for resourcePattern, _ := range globals.Application.ClusterAdmissionPolicyPool.Pool { @@ -154,7 +216,6 @@ func (r *ClusterAdmissionPolicyReconciler) SyncAdmissionPool(ctx context.Context tmpWebhookObj.ClientConfig = r.Options.WebhookClientConfig tmpWebhookObj.Rules = currentVwcRules tmpWebhookObj.TimeoutSeconds = &timeoutSecondsConverted - //tmpWebhookObj.MatchConditions = object.Spec.WatchedResources.MatchConditions sideEffectsClass := admissionregv1.SideEffectClass(admissionregv1.SideEffectClassNone) tmpWebhookObj.SideEffects = &sideEffectsClass @@ -162,22 +223,38 @@ func (r *ClusterAdmissionPolicyReconciler) SyncAdmissionPool(ctx context.Context // Replace the webhooks section in the ValidatingWebhookConfiguration metaWebhookObj.Webhooks = []admissionregv1.ValidatingWebhook{tmpWebhookObj} - // Sync changes to Kubernetes - if errors.IsNotFound(err) { - err = r.Create(ctx, &metaWebhookObj) - if err != nil { - err = fmt.Errorf("error creating ValidatingWebhookConfiguration '%s': %s", - ValidatingWebhookConfigurationName, err.Error()) - return - } - } else { - err = r.Update(ctx, &metaWebhookObj) - if err != nil { - err = fmt.Errorf("error updating ValidatingWebhookConfiguration '%s': %s", - ValidatingWebhookConfigurationName, err.Error()) - return + return metaWebhookObj, nil +} + +// getAllSourcesReferences iterates over all the ClusterAdmissionPolicy objects and +// create a list with all the sources in the format GVRNN (Group/Version/Resource/Namespace/Name) +func (r *ClusterAdmissionPolicyReconciler) getAllSourcesReferences() (references []string) { + + globals.Application.ClusterAdmissionPolicyPool.Mutex.Lock() + defer globals.Application.ClusterAdmissionPolicyPool.Mutex.Unlock() + + // + for _, capList := range globals.Application.ClusterAdmissionPolicyPool.Pool { + for _, capObject := range capList { + for _, capObjSource := range capObject.Spec.Sources { + + sourceString := fmt.Sprintf("%s/%s/%s/%s/%s", + capObjSource.Group, + capObjSource.Version, + capObjSource.Resource, + capObjSource.Namespace, + capObjSource.Namespace, + ) + + if slices.Contains(references, sourceString) { + continue + } + + references = append(references, sourceString) + + } } } - return nil + return references } diff --git a/internal/controller/commons.go b/internal/controller/commons.go index b4a2cbc..4128d50 100644 --- a/internal/controller/commons.go +++ b/internal/controller/commons.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package controller import ( diff --git a/internal/globals/conditions.go b/internal/globals/conditions.go index acbcf8c..d95d724 100644 --- a/internal/globals/conditions.go +++ b/internal/globals/conditions.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import ( diff --git a/internal/globals/globals.go b/internal/globals/globals.go index cf607af..96711d7 100644 --- a/internal/globals/globals.go +++ b/internal/globals/globals.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import ( diff --git a/internal/globals/pools.go b/internal/globals/pools.go index 74d3009..741d8b3 100644 --- a/internal/globals/pools.go +++ b/internal/globals/pools.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import "freepik.com/admitik/api/v1alpha1" diff --git a/internal/globals/types.go b/internal/globals/types.go index 8668e2e..c412ed7 100644 --- a/internal/globals/types.go +++ b/internal/globals/types.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import ( @@ -10,6 +26,7 @@ import ( // "freepik.com/admitik/api/v1alpha1" + "freepik.com/admitik/internal/sources" ) // ClusterAdmissionPolicyPoolT represents TODO @@ -25,12 +42,13 @@ type applicationT struct { // Context TODO Context context.Context - // KubeRawClient TODO - KubeRawClient *dynamic.DynamicClient - - // KubeRawCoreClient TODO + // Kubernetes clients + KubeRawClient *dynamic.DynamicClient KubeRawCoreClient *kubernetes.Clientset + // + SourceController *sources.SourcesController + // ClusterAdmissionPolicyPool ClusterAdmissionPolicyPoolT } diff --git a/internal/globals/utils.go b/internal/globals/utils.go index 35f6c2a..725f6fa 100644 --- a/internal/globals/utils.go +++ b/internal/globals/utils.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import ( @@ -5,12 +21,12 @@ import ( // "os" + // "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ctrl "sigs.k8s.io/controller-runtime" - // ) // NewKubernetesClient return a new Kubernetes Dynamic client from client-go SDK diff --git a/internal/sources/controller.go b/internal/sources/controller.go new file mode 100644 index 0000000..d7c4f75 --- /dev/null +++ b/internal/sources/controller.go @@ -0,0 +1,245 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sources + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + // + + // + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// SourcesControllerOptions TODO +type SourcesControllerOptions struct { + + // Duration to wait until resync all the objects + InformerDurationToResync time.Duration + + // WatchersDurationBetweenReconcileLoops is the duration to wait between the moment + // of launching watchers, and repeating this process (avoid the spam, mate) + WatchersDurationBetweenReconcileLoops time.Duration + + // WatcherDurationToAck is the duration before checking whether a watcher + // is started or not during watchers' reconciling process + WatcherDurationToAck time.Duration +} + +// SourcesControllerOptions represents the controller that triggers parallel watchers. +// These watchers are in charge of maintaining the pool of sources asked by the user in ClusterAdmissionPolicy objects. +// A source group is represented by GVRNN (Group + Version + Resource + Namespace + Name) +type SourcesController struct { + // Kubernetes clients + Client *dynamic.DynamicClient + + // options to modify the behavior of this SourcesController + Options SourcesControllerOptions + + // Carried stuff + watcherPool WatcherPoolT +} + +// TODO +func (r *SourcesController) init() { + r.watcherPool = WatcherPoolT{ + Mutex: &sync.RWMutex{}, + Pool: map[resourceTypeName]*resourceTypeWatcherT{}, + } +} + +// Start launches the SourcesController and keeps it alive +// It kills the controller on application context death, and rerun the process when failed +func (r *SourcesController) Start(ctx context.Context) { + logger := log.FromContext(ctx) + + r.init() + + for { + select { + case <-ctx.Done(): + logger.Info("SourcesController finished by context") + return + default: + r.reconcileWatchers(ctx) + } + + time.Sleep(2 * time.Second) + } +} + +// reconcileWatchers launches a parallel process that launches +// watchers for resource types defined into the WatcherPool +func (r *SourcesController) reconcileWatchers(ctx context.Context) { + logger := log.FromContext(ctx) + + for resourceType, resourceTypeWatcher := range r.watcherPool.Pool { + + // TODO: Is this really needed or useful? + // Check the existence of the resourceType into the WatcherPool. + // Remember the controller.ClusterAdmissionPolicyController can remove watchers on garbage collection + if _, resourceTypeFound := r.watcherPool.Pool[resourceType]; !resourceTypeFound { + continue + } + + // Prevent blocked watchers from being started. + // Remember the controller.ClusterAdmissionPolicyController blocks them during garbage collection + if *resourceTypeWatcher.Blocked { + continue + } + + if !*resourceTypeWatcher.Started { + go r.watchType(ctx, resourceType) + + // Wait for the resourceType watcher to ACK itself into WatcherPool + time.Sleep(r.Options.WatcherDurationToAck) + if !*(r.watcherPool.Pool[resourceType].Started) { + logger.Info(fmt.Sprintf("Impossible to start watcher for resource type: %s", resourceType)) + } + } + + // Wait a bit to reduce the spam to machine resources + time.Sleep(r.Options.WatchersDurationBetweenReconcileLoops) + } +} + +// watchType launches a watcher for a certain resource type, and trigger processing for each entering resource event +func (r *SourcesController) watchType(ctx context.Context, watchedType resourceTypeName) { + + logger := log.FromContext(ctx) + + logger.Info(fmt.Sprintf("Watcher for '%s' has been started", watchedType)) + + // Set ACK flag for watcher launching into the WatcherPool + *(r.watcherPool.Pool[watchedType].Started) = true + defer func() { + *(r.watcherPool.Pool[watchedType].Started) = false + }() + + // Extract GVR + Namespace + Name from watched type: + // {group}/{version}/{resource}/{namespace}/{name} + GVRNN := strings.Split(string(watchedType), "/") + if len(GVRNN) != 5 { + logger.Info("Failed to parse GVR from resourceType. Does it look like {group}/{version}/{resource}?") + return + } + resourceGVR := schema.GroupVersionResource{ + Group: GVRNN[0], + Version: GVRNN[1], + Resource: GVRNN[2], + } + + // Include the namespace when defined by the user (used as filter) + namespace := corev1.NamespaceAll + if GVRNN[3] != "" { + namespace = GVRNN[3] + } + + // Include the name when defined by the user (used as filter) + name := GVRNN[4] + + var listOptionsFunc dynamicinformer.TweakListOptionsFunc = func(options *metav1.ListOptions) {} + if name != "" { + listOptionsFunc = func(options *metav1.ListOptions) { + options.FieldSelector = "metadata.name=" + name + } + } + + // Listen to stop signal to kill this watcher just in case it's needed + stopCh := make(chan struct{}) + + go func() { + <-*(r.watcherPool.Pool[watchedType].StopSignal) + close(stopCh) + logger.Info(fmt.Sprintf("Watcher for resource type '%s' killed by StopSignal", watchedType)) + }() + + // Define our informer + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(r.Client, + r.Options.InformerDurationToResync, namespace, listOptionsFunc) + + // Create an informer. This is a special type of client-go watcher that includes + // mechanisms to hide disconnections, handle reconnections, and cache watched objects + informer := factory.ForResource(resourceGVR).Informer() + + // Register functions to handle different types of events + handlers := cache.ResourceEventHandlerFuncs{ + + AddFunc: func(eventObject interface{}) { + convertedObject := eventObject.(*unstructured.Unstructured) + + err := r.createWatcherResource(watchedType, convertedObject) + if err != nil { + logger.WithValues( + "watcher", watchedType, + "object", convertedObject.GetNamespace()+"/"+convertedObject.GetName(), + ).Error(err, "Error creating resource in resource list") + return + } + }, + UpdateFunc: func(_, eventObject interface{}) { + convertedObject := eventObject.(*unstructured.Unstructured) + + objectIndex := r.getWatcherResourceIndex(watchedType, convertedObject) + if objectIndex > -1 { + + err := r.updateWatcherResourceByIndex(watchedType, objectIndex, convertedObject) + if err != nil { + logger.WithValues( + "watcher", watchedType, + "object", convertedObject.GetNamespace()+"/"+convertedObject.GetName(), + ).Error(err, "Error updating resource in resource list") + return + } + } + }, + DeleteFunc: func(eventObject interface{}) { + convertedObject := eventObject.(*unstructured.Unstructured) + objectIndex := r.getWatcherResourceIndex(watchedType, convertedObject) + + if objectIndex > -1 { + err := r.deleteWatcherResourceByIndex(watchedType, objectIndex) + if err != nil { + logger.WithValues( + "watcher", watchedType, + "object", convertedObject.GetNamespace()+"/"+convertedObject.GetName(), + ).Error(err, "Error deleting resource from resource list") + return + } + } + }, + } + + _, err := informer.AddEventHandler(handlers) + if err != nil { + logger.Error(err, "Error adding handling functions for events to an informer") + return + } + + informer.Run(stopCh) +} diff --git a/internal/sources/types.go b/internal/sources/types.go new file mode 100644 index 0000000..94511af --- /dev/null +++ b/internal/sources/types.go @@ -0,0 +1,52 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sources + +import ( + "sync" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// TODO +type resourceTypeName string + +// TODO +type resourceTypeWatcherT struct { + // Enforce concurrency safety + Mutex *sync.RWMutex + + // Started represents a flag to know if the watcher is running + Started *bool + + // Blocked represents a flag to prevent watcher from starting + Blocked *bool + + // StopSignal represents a flag to kill the watcher. + // Watcher will be potentially re-launched by SourcesController + StopSignal *chan bool + + // + ResourceList []*unstructured.Unstructured +} + +type WatcherPoolT struct { + // Enforce concurrency safety + Mutex *sync.RWMutex + + Pool map[resourceTypeName]*resourceTypeWatcherT +} diff --git a/internal/sources/utils.go b/internal/sources/utils.go new file mode 100644 index 0000000..fa718bc --- /dev/null +++ b/internal/sources/utils.go @@ -0,0 +1,273 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sources + +import ( + "errors" + "fmt" + "sync" + "time" + + // + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// prepareWatcher scaffolds a new watcher in the WatchedPool. +// This prepares the field for later watchers' reconciliation process. +// That process will create the real Kubernetes informer for this object +// This function is not responsible for blocking the pool before being executed +func (r *SourcesController) prepareWatcher(watcherType resourceTypeName) { + started := false + blocked := false + stopSignal := make(chan bool) + mutex := &sync.RWMutex{} + + r.watcherPool.Pool[watcherType] = &resourceTypeWatcherT{ + Mutex: mutex, + Started: &started, + Blocked: &blocked, + StopSignal: &stopSignal, + ResourceList: make([]*unstructured.Unstructured, 0), + } +} + +// disableWatcher disables a watcher from the WatcherPool. +// It first blocks the watcher to prevent it from being started by any controller, +// then, the watcher is stopped and resources are deleted. +// This function is not responsible for blocking the pool before being executed +func (r *SourcesController) disableWatcher(watcherType resourceTypeName) (result bool) { + + // 1. Prevent watcher from being started again + *r.watcherPool.Pool[watcherType].Blocked = true + + // 2. Stop the watcher + *r.watcherPool.Pool[watcherType].StopSignal <- true + + // 3. Wait for the watcher to be stopped. Return false on failure + stoppedWatcher := false + for i := 0; i < 10; i++ { + if !*r.watcherPool.Pool[watcherType].Started { + stoppedWatcher = true + break + } + time.Sleep(1 * time.Second) + } + + if !stoppedWatcher { + return false + } + + r.watcherPool.Pool[watcherType].ResourceList = []*unstructured.Unstructured{} + return true +} + +// SyncWatchers ensures the WatcherPool matches the desired state. +// +// Given a list of desired watchers in GVRNN format (Group/Version/Resource/Namespace/Name), +// this function creates missing watchers, ensures active ones are unblocked, and removes +// any watchers that are no longer needed. +func (r *SourcesController) SyncWatchers(watcherTypeList []string) (err error) { + + // 0. Check if WatcherPool is ready to work + if r.watcherPool.Mutex == nil { + return fmt.Errorf("watcher pool is not ready") + } + + // 1. Small conversions to gain performance on huge watchers lists + desiredWatchers := make(map[resourceTypeName]struct{}, len(watcherTypeList)) + for _, watcherType := range watcherTypeList { + desiredWatchers[resourceTypeName(watcherType)] = struct{}{} + } + + // 2. Keep or create desired watchers + for watcherType := range desiredWatchers { + + // Lock the WatcherPool mutex for reading + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() + + if !exists { + // Lock the watcher's mutex for writing + r.watcherPool.Mutex.Lock() + r.prepareWatcher(watcherType) + r.watcherPool.Mutex.Unlock() + continue + } + + // Ensure the watcher is NOT blocked + watcher.Mutex.Lock() + if !*watcher.Started { + *watcher.Blocked = false + } + watcher.Mutex.Unlock() + } + + // 3. Clean undesired watchers + r.watcherPool.Mutex.RLock() + existingWatchers := make([]resourceTypeName, 0, len(r.watcherPool.Pool)) + for watcherType := range r.watcherPool.Pool { + existingWatchers = append(existingWatchers, watcherType) + } + r.watcherPool.Mutex.RUnlock() + + for _, watcherType := range existingWatchers { + if _, needed := desiredWatchers[watcherType]; !needed { + // Lock WatcherPool to access the watcher + r.watcherPool.Mutex.RLock() + watcher := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() + + watcher.Mutex.Lock() + watcherDisabled := r.disableWatcher(watcherType) + watcher.Mutex.Unlock() + + if !watcherDisabled { + err = errors.Join(err, fmt.Errorf("imposible to disable watcher for: %s", watcherType)) + } + + // Delete the watcher from the WatcherPool + r.watcherPool.Mutex.Lock() + delete(r.watcherPool.Pool, watcherType) + r.watcherPool.Mutex.Unlock() + } + } + + return err +} + +// GetWatcherResources accept a desired watcher in the GVRNN format (Group/Version/Resource/Namespace/Name) +// and returns a list of resources matching it +func (r *SourcesController) GetWatcherResources(watcherType string) (resources []*unstructured.Unstructured, err error) { + + // 0. Check if WatcherPool is ready to work + if r.watcherPool.Mutex == nil { + return resources, fmt.Errorf("watcher pool is not ready") + } + + // Lock the WatcherPool mutex for reading + r.watcherPool.Mutex.RLock() + watcher, watcherTypeFound := r.watcherPool.Pool[resourceTypeName(watcherType)] + r.watcherPool.Mutex.RUnlock() + + if !watcherTypeFound { + return nil, fmt.Errorf("watcher type '%s' not found. Is the watcher created?", watcherType) + } + + // Lock the watcher's mutex for reading + watcher.Mutex.RLock() + defer watcher.Mutex.RUnlock() + + // Return the pointer to the ResourceList + return watcher.ResourceList, nil +} + +// createWatcherResource TODO +func (r *SourcesController) createWatcherResource(watcherType resourceTypeName, resource *unstructured.Unstructured) error { + // Lock the WatcherPool mutex for reading + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() + + if !exists { + return fmt.Errorf("watcher type '%s' not found. Is the watcher created?", watcherType) + } + + // Lock the watcher's mutex for writing + watcher.Mutex.Lock() + defer watcher.Mutex.Unlock() + + temporaryManifest := resource.DeepCopy() + watcher.ResourceList = append(watcher.ResourceList, temporaryManifest) + + return nil +} + +// TODO +func (r *SourcesController) getWatcherResourceIndex(watcherType resourceTypeName, resource *unstructured.Unstructured) int { + // Lock the WatcherPool mutex for reading + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() + + if !exists { + return -1 + } + + // Lock the watcher's mutex for reading + watcher.Mutex.RLock() + defer watcher.Mutex.RUnlock() + + for index, tmpResource := range watcher.ResourceList { + if tmpResource.GetName() == resource.GetName() && + tmpResource.GetNamespace() == resource.GetNamespace() { + return index + } + } + + return -1 +} + +// TODO +func (r *SourcesController) updateWatcherResourceByIndex(watcherType resourceTypeName, resourceIndex int, resource *unstructured.Unstructured) error { + // Lock the WatcherPool mutex for reading + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() + + if !exists { + return fmt.Errorf("watcher type '%s' not found", watcherType) + } + + // Lock the watcher's mutex for writing + watcher.Mutex.Lock() + defer watcher.Mutex.Unlock() + + if resourceIndex < 0 || resourceIndex >= len((*watcher).ResourceList) { + return fmt.Errorf("resource index out of bounds") + } + + ((*watcher).ResourceList)[resourceIndex] = resource + + return nil +} + +// TODO +func (r *SourcesController) deleteWatcherResourceByIndex(watcherType resourceTypeName, resourceIndex int) error { + // Lock the WatcherPool mutex for reading + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() + + if !exists { + return fmt.Errorf("watcher type '%s' not found", watcherType) + } + + // Lock the watcher's mutex for writing + watcher.Mutex.Lock() + defer watcher.Mutex.Unlock() + + if resourceIndex < 0 || resourceIndex >= len((*watcher).ResourceList) { + return fmt.Errorf("resource index out of bounds") + } + + // Substitute the selected notification object with the last one from the list, + // then replace the whole list with it, minus the last. + (*watcher).ResourceList = append(((*watcher).ResourceList)[:resourceIndex], ((*watcher).ResourceList)[resourceIndex+1:]...) + + return nil +} diff --git a/internal/template/functions.go b/internal/template/functions.go index badc151..20a0038 100644 --- a/internal/template/functions.go +++ b/internal/template/functions.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package template import (