Skip to content

Commit

Permalink
Merge pull request #4 from freepik-company/feat/implement-sources-con…
Browse files Browse the repository at this point in the history
…troller

Feat/implement sources controller
  • Loading branch information
achetronic authored Nov 26, 2024
2 parents 29d32dc + 569b02a commit 53d7dd5
Show file tree
Hide file tree
Showing 18 changed files with 923 additions and 83 deletions.
33 changes: 18 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,24 @@ resources:
Some configuration parameters can be defined by flags that can be passed to the controller.
They are described in the following table:
| Name | Description | Default |
|:-------------------------------|:-------------------------------------------------------------------------------|:----------------------:|
| `--metrics-bind-address` | The address the metric endpoint binds to. </br> 0 disables the server | `0` |
| `--health-probe-bind-address` | he address the probe endpoint binds to | `:8081` |
| `--leader-elect` | Enable leader election for controller manager | `false` |
| `--metrics-secure` | If set the metrics endpoint is served securely | `false` |
| `--enable-http2` | If set, HTTP/2 will be enabled for the metrirs | `false` |
| `--webhook-client-hostname` | The hostname used by Kubernetes when calling the webhooks server | `webhooks.admitik.svc` |
| `--webhook-client-port` | The port used by Kubernetes when calling the webhooks server | `10250` |
| `--webhook-client-timeout` | The seconds until timout waited by Kubernetes when calling the webhooks server | `10` |
| `--webhook-server-port` | The port where the webhooks server listens | `10250` |
| `--webhook-server-path` | The path where the webhooks server listens | `/validate` |
| `--webhook-server-ca` | The CA bundle to use for the webhooks server | `-` |
| `--webhook-server-certificate` | The Certificate used by webhooks server | `-` |
| `--webhook-server-private-key` | The Private Key used by webhooks server | `-` |
| Name | Description | Default |
|:---------------------------------------|:-------------------------------------------------------------------------------|:----------------------:|
| `--metrics-bind-address` | The address the metric endpoint binds to. </br> 0 disables the server | `0` |
| `--health-probe-bind-address` | he address the probe endpoint binds to | `:8081` |
| `--leader-elect` | Enable leader election for controller manager | `false` |
| `--metrics-secure` | If set the metrics endpoint is served securely | `false` |
| `--enable-http2` | If set, HTTP/2 will be enabled for the metrirs | `false` |
| `--sources-time-to-resync-informers` | Interval to resynchronize all resources in the informers | `60s` |
| `--sources-time-to-reconcile-watchers` | Time between each reconciliation loop of the watchers | `10s` |
| `--sources-time-to-ack-watcher` | Wait time before marking a watcher as acknowledged (ACK) after it starts | `2s` |
| `--webhook-client-hostname` | The hostname used by Kubernetes when calling the webhooks server | `webhooks.admitik.svc` |
| `--webhook-client-port` | The port used by Kubernetes when calling the webhooks server | `10250` |
| `--webhook-client-timeout` | The seconds until timout waited by Kubernetes when calling the webhooks server | `10` |
| `--webhook-server-port` | The port where the webhooks server listens | `10250` |
| `--webhook-server-path` | The path where the webhooks server listens | `/validate` |
| `--webhook-server-ca` | The CA bundle to use for the webhooks server | `-` |
| `--webhook-server-certificate` | The Certificate used by webhooks server | `-` |
| `--webhook-server-private-key` | The Private Key used by webhooks server | `-` |


## Examples
Expand Down
39 changes: 34 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"strings"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -41,10 +42,11 @@ import (

//
"freepik.com/admitik/api/v1alpha1"
"freepik.com/admitik/internal/admission"
"freepik.com/admitik/internal/certificates"
"freepik.com/admitik/internal/controller"
"freepik.com/admitik/internal/globals"
"freepik.com/admitik/internal/xyz"
"freepik.com/admitik/internal/sources"
// +kubebuilder:scaffold:imports
)

Expand All @@ -70,6 +72,10 @@ func main() {
var enableHTTP2 bool

// Custom flags from here
var sourcesTimeToResyncInformers time.Duration
var sourcesTimeToReconcileWatchers time.Duration
var sourcesTimeToAckWatcher time.Duration

var webhooksClientHostname string
var webhooksClientPort int
var webhooksClientTimeout int
Expand All @@ -95,6 +101,13 @@ func main() {
"If set, HTTP/2 will be enabled for the metrics and webhook servers")

// Custom flags from here
flag.DurationVar(&sourcesTimeToResyncInformers, "sources-time-to-resync-informers", 60*time.Second,
"Interval to resynchronize all resources in the informers")
flag.DurationVar(&sourcesTimeToReconcileWatchers, "sources-time-to-reconcile-watchers", 10*time.Second,
"Time between each reconciliation loop of the watchers")
flag.DurationVar(&sourcesTimeToAckWatcher, "sources-time-to-ack-watcher", 2*time.Second,
"Wait time before marking a watcher as acknowledged (ACK) after it starts")

flag.StringVar(&webhooksClientHostname, "webhook-client-hostname", "webhooks.admitik.svc",
"The hostname used by Kubernetes when calling the webhooks server")
flag.IntVar(&webhooksClientPort, "webhook-client-port", 10250,
Expand Down Expand Up @@ -318,6 +331,22 @@ func main() {
os.Exit(1)
}

// Init SourcesController.
// This controller is in charge of launching watchers to cache sources expressed in some CRs in background.
// This way we avoid retrieving them from Kubernetes on each request to the Admission/Mutation controllers.
sourcesController := sources.SourcesController{
Client: globals.Application.KubeRawClient,
Options: sources.SourcesControllerOptions{
InformerDurationToResync: sourcesTimeToResyncInformers,
WatchersDurationBetweenReconcileLoops: sourcesTimeToReconcileWatchers,
WatcherDurationToAck: sourcesTimeToAckWatcher,
},
}

setupLog.Info("starting sources controller")
globals.Application.SourceController = &sourcesController
go sourcesController.Start(globals.Application.Context)

// Init primary controller
// ATTENTION: This controller may be replaced by a custom one in the future doing the same tasks
// to simplify this project's dependencies and maintainability
Expand All @@ -344,9 +373,9 @@ func main() {
}

// Init secondary controller to process coming events
workloadController := xyz.WorkloadController{
admissionController := admission.AdmissionController{
Client: mgr.GetClient(),
Options: xyz.WorkloadControllerOptions{
Options: admission.AdmissionControllerOptions{

//
ServerAddr: "0.0.0.0",
Expand All @@ -359,8 +388,8 @@ func main() {
},
}

setupLog.Info("starting workload controller")
go workloadController.Start(globals.Application.Context)
setupLog.Info("starting admission controller")
go admissionController.Start(globals.Application.Context)

//
setupLog.Info("starting manager")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package xyz
package admission

import (
"context"
Expand All @@ -32,12 +32,12 @@ import (
const (

//
controllerContextFinishedMessage = "xyz.WorkloadController finished by context"
controllerContextFinishedMessage = "admission.AdmissionController finished by context"
)

// WorkloadControllerOptions represents available options that can be passed
// to WorkloadController on start
type WorkloadControllerOptions struct {
// AdmissionControllerOptions represents available options that can be passed
// to AdmissionController on start
type AdmissionControllerOptions struct {
//
ServerAddr string
ServerPort int
Expand All @@ -48,19 +48,19 @@ type WorkloadControllerOptions struct {
TLSPrivateKey string
}

// WorkloadController represents the controller that triggers parallel threads.
// AdmissionController represents the controller that triggers parallel threads.
// These threads process coming events against the conditions defined in Notification CRs
// Each thread is a watcher in charge of a group of resources GVRNN (Group + Version + Resource + Namespace + Name)
type WorkloadController struct {
type AdmissionController struct {
Client client.Client

//
Options WorkloadControllerOptions
Options AdmissionControllerOptions
}

// Start launches the XYZ.WorkloadController and keeps it alive
// Start launches the AdmissionController and keeps it alive
// It kills the controller on application context death, and rerun the process when failed
func (r *WorkloadController) Start(ctx context.Context) {
func (r *AdmissionController) Start(ctx context.Context) {
logger := log.FromContext(ctx)

for {
Expand All @@ -79,7 +79,7 @@ func (r *WorkloadController) Start(ctx context.Context) {
}

// runWebserver prepares and runs the HTTP server
func (r *WorkloadController) runWebserver() (err error) {
func (r *AdmissionController) runWebserver() (err error) {

customServer := NewHttpServer()

Expand Down
57 changes: 28 additions & 29 deletions internal/xyz/server.go → internal/admission/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
package xyz
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package admission

import (
"context"
Expand All @@ -9,16 +25,17 @@ import (
"slices"
"time"

//
"freepik.com/admitik/api/v1alpha1"
"freepik.com/admitik/internal/globals"
"freepik.com/admitik/internal/template"

//
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -149,7 +166,7 @@ func (s *HttpServer) handleRequest(response http.ResponseWriter, request *http.R
// Retrieve the sources declared per policy
for sourceIndex, sourceItem := range caPolicyObj.Spec.Sources {

unstructuredSourceObjList, err := getKubeResourceList(request.Context(),
unstructuredSourceObjList, err := getSourcesFromPool(
sourceItem.Group,
sourceItem.Version,
sourceItem.Resource,
Expand All @@ -167,8 +184,8 @@ func (s *HttpServer) handleRequest(response http.ResponseWriter, request *http.R
tmpSources = make(map[int][]map[string]interface{})
}

for _, unstructuredItem := range unstructuredSourceObjList.Items {
tmpSources[sourceIndex] = append(tmpSources[sourceIndex], unstructuredItem.Object)
for _, unstructuredItem := range unstructuredSourceObjList {
tmpSources[sourceIndex] = append(tmpSources[sourceIndex], (*unstructuredItem).Object)
}

specificTemplateInjectedObject["sources"] = tmpSources
Expand Down Expand Up @@ -221,34 +238,16 @@ func (s *HttpServer) handleRequest(response http.ResponseWriter, request *http.R
reviewResponse.Response.Result = &metav1.Status{}
}

// getKubeResourceList returns an unstructuredList of resources selected by params
func getKubeResourceList(ctx context.Context, group, version, resource, namespace, name string) (
resourceList *unstructured.UnstructuredList, err error) {
// getSourcesFromPool returns a list of unstructured resources selected by params from the sources cache
func getSourcesFromPool(group, version, resource, namespace, name string) (
resourceList []*unstructured.Unstructured, err error) {

unstructuredSourceObj := globals.Application.KubeRawClient.Resource(schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
})

sourceListOptions := metav1.ListOptions{}

if namespace != "" {
sourceListOptions.FieldSelector = fmt.Sprintf("metadata.namespace=%s", namespace)
}

if name != "" {
if sourceListOptions.FieldSelector != "" {
sourceListOptions.FieldSelector += ","
}
sourceListOptions.FieldSelector = fmt.Sprintf("metadata.name=%s", name)
}
sourceString := fmt.Sprintf("%s/%s/%s/%s/%s", group, version, resource, namespace, name)

resourceList, err = unstructuredSourceObj.List(ctx, sourceListOptions)
return resourceList, err
return globals.Application.SourceController.GetWatcherResources(sourceString)
}

// createKubeEvent TODO
// createKubeEvent creates a modern event in Kuvernetes with data given by params
func createKubeEvent(ctx context.Context, namespace string, object map[string]interface{},
policy v1alpha1.ClusterAdmissionPolicy, action, message string) (err error) {

Expand Down
18 changes: 17 additions & 1 deletion internal/xyz/utils.go → internal/admission/utils.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
package xyz
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package admission

import (
"errors"
Expand Down
16 changes: 16 additions & 0 deletions internal/certificates/certificates.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package certificates

import (
Expand Down
16 changes: 16 additions & 0 deletions internal/controller/clusteradmissionpolicy_status.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
Expand Down
Loading

0 comments on commit 53d7dd5

Please sign in to comment.