diff --git a/CHANGELOG.md b/CHANGELOG.md index 64d57dcb..06595451 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ This changelog keeps track of work items that have been completed and are ready ### New +- **General**: Support portName in HTTPScaledObject service scaleTargetRef ([#1174](https://github.com/kedacore/http-add-on/issues/1174)) - **General**: Support setting multiple TLS certs for different domains on the interceptor proxy ([#1116](https://github.com/kedacore/http-add-on/issues/1116)) - **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO)) diff --git a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml index 2c48f3ef..1bcff027 100644 --- a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml +++ b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml @@ -93,8 +93,9 @@ spec: type: integer type: object scaleTargetRef: - description: The name of the deployment to route HTTP requests to - (and to autoscale). + description: |- + The name of the deployment to route HTTP requests to (and to autoscale). + Including validation as a requirement to define either the PortName or the Port properties: apiVersion: type: string @@ -106,13 +107,18 @@ spec: description: The port to route to format: int32 type: integer + portName: + description: The port to route to referenced by name + type: string service: description: The name of the service to route to type: string required: - - port - service type: object + x-kubernetes-validations: + - message: must define either the 'portName' or the 'port' + rule: has(self.portName) != has(self.port) scaledownPeriod: description: (optional) Cooldown period value format: int32 diff --git a/config/interceptor/role.yaml b/config/interceptor/role.yaml index 51c0d076..8cc9a7a2 100644 --- a/config/interceptor/role.yaml +++ b/config/interceptor/role.yaml @@ -12,6 +12,14 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - services + verbs: + - get + - list + - watch - apiGroups: - http.keda.sh resources: diff --git a/docs/ref/v0.8.1/http_scaled_object.md b/docs/ref/v0.8.1/http_scaled_object.md new file mode 100644 index 00000000..30e31b2b --- /dev/null +++ b/docs/ref/v0.8.1/http_scaled_object.md @@ -0,0 +1,148 @@ +# The `HTTPScaledObject` + +>This document reflects the specification of the `HTTPScaledObject` resource for the `v0.8.1` version. + +Each `HTTPScaledObject` looks approximately like the below: + +```yaml +kind: HTTPScaledObject +apiVersion: http.keda.sh/v1alpha1 +metadata: + name: xkcd + annotations: + httpscaledobject.keda.sh/skip-scaledobject-creation: "false" +spec: + hosts: + - myhost.com + pathPrefixes: + - /test + scaleTargetRef: + name: xkcd + kind: Deployment + apiVersion: apps/v1 + service: xkcd + port: 8080 + replicas: + min: 5 + max: 10 + scaledownPeriod: 300 + scalingMetric: # requestRate and concurrency are mutually exclusive + requestRate: + granularity: 1s + targetValue: 100 + window: 1m + concurrency: + targetValue: 100 +``` + +This document is a narrated reference guide for the `HTTPScaledObject`. + +## `httpscaledobject.keda.sh/skip-scaledobject-creation` annotation + +This annotation will disable the ScaledObject generation and management but keeping the routing and metrics available. This is done removing the current ScaledObject if it has been already created, allowing to use user managed ScaledObjects pointing the add-on scaler directly (supporting all the ScaledObject configurations and multiple triggers). You can read more about this [here](./../../walkthrough.md#integrating-http-add-on-scaler-with-other-keda-scalers) + + +## `hosts` + +These are the hosts to apply this scaling rule to. All incoming requests with one of these values in their `Host` header will be forwarded to the `Service` and port specified in the below `scaleTargetRef`, and that same `scaleTargetRef`'s workload will be scaled accordingly. + +## `pathPrefixes` + +>Default: "/" + +These are the paths to apply this scaling rule to. All incoming requests with one of these values as path prefix will be forwarded to the `Service` and port specified in the below `scaleTargetRef`, and that same `scaleTargetRef`'s workload will be scaled accordingly. + +## `scaleTargetRef` + +This is the primary and most important part of the `spec` because it describes: + +1. The incoming host to apply this scaling rule to. +2. What workload to scale. +3. The service to which to route HTTP traffic. + +### `deployment` (DEPRECTATED: removed as part of v0.9.0) + +This is the name of the `Deployment` to scale. It must exist in the same namespace as this `HTTPScaledObject` and shouldn't be managed by any other autoscaling system. This means that there should not be any `ScaledObject` already created for this `Deployment`. The HTTP Add-on will manage a `ScaledObject` internally. + +### `name` + +This is the name of the workload to scale. It must exist in the same namespace as this `HTTPScaledObject` and shouldn't be managed by any other autoscaling system. This means that there should not be any `ScaledObject` already created for this workload. The HTTP Add-on will manage a `ScaledObject` internally. + +### `kind` + +This is the kind of the workload to scale. + +### `apiVersion` + +This is the apiVersion of the workload to scale. + +### `service` + +This is the name of the service to route traffic to. The add-on will create autoscaling and routing components that route to this `Service`. It must exist in the same namespace as this `HTTPScaledObject` and should route to the same `Deployment` as you entered in the `deployment` field. + +### `port` + +This is the port to route to on the service that you specified in the `service` field. It should be exposed on the service and should route to a valid `containerPort` on the `Deployment` you gave in the `deployment` field. + +### `portName` + +Alternatively, the port can be referenced using it's `name` as defined in the `Service`. + +### `targetPendingRequests` (DEPRECTATED: removed as part of v0.9.0) + +>Default: 100 + +This is the number of _pending_ (or in-progress) requests that your application needs to have before the HTTP Add-on will scale it. Conversely, if your application has below this number of pending requests, the HTTP add-on will scale it down. + +For example, if you set this field to 100, the HTTP Add-on will scale your app up if it sees that there are 200 in-progress requests. On the other hand, it will scale down if it sees that there are only 20 in-progress requests. Note that it will _never_ scale your app to zero replicas unless there are _no_ requests in-progress. Even if you set this value to a very high number and only have a single in-progress request, your app will still have one replica. + +### `scaledownPeriod` + +>Default: 300 + +The period to wait after the last reported active before scaling the resource back to 0. + +> Note: This time is measured on KEDA side based on in-flight requests, so workloads with few and random traffic could have unexpected scale to 0 cases. In those case we recommend to extend this period to ensure it doesn't happen. + + +## `scalingMetric` + +This is the second most important part of the `spec` because it describes how the workload has to scale. This section contains 2 nested sections (`requestRate` and `concurrency`) which are mutually exclusive between themselves. + +### `requestRate` + +This section enables scaling based on the request rate. + +> **NOTE**: Requests information is stored in memory, aggragating long periods (longer than 5 minutes) or too fine granularity (less than 1 second) could produce perfomance issues or memory usage increase. + +> **NOTE 2**: Although updating `window` and/or `granularity` is something doable, the process just replaces all the stored request count infomation. This can produce unexpected scaling behaviours until the window is populated again. + +#### `targetValue` + +>Default: 100 + +This is the target value for the scaling configuration. + +#### `window` + +>Default: "1m" + +This value defines the aggregation window for the request rate calculation. + +#### `granularity` + +>Default: "1s" + +This value defines the granualarity of the aggregated requests for the request rate calculation. + +### `concurrency` + +This section enables scaling based on the request concurrency. + +> **NOTE**: This is the only scaling behaviour before v0.8.0 + +#### `targetValue` + +>Default: 100 + +This is the target value for the scaling configuration. diff --git a/interceptor/main.go b/interceptor/main.go index 56f45116..5a67887a 100644 --- a/interceptor/main.go +++ b/interceptor/main.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" + k8sinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -42,6 +43,7 @@ var ( // +kubebuilder:rbac:groups=http.keda.sh,resources=httpscaledobjects,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch func main() { timeoutCfg := config.MustParseTimeouts() @@ -85,11 +87,10 @@ func main() { setupLog.Error(err, "creating new Kubernetes ClientSet") os.Exit(1) } - endpointsCache := k8s.NewInformerBackedEndpointsCache( - ctrl.Log, - cl, - time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS), - ) + + k8sSharedInformerFactory := k8sinformers.NewSharedInformerFactory(cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS)) + svcCache := k8s.NewInformerBackedServiceCache(ctrl.Log, cl, k8sSharedInformerFactory) + endpointsCache := k8s.NewInformerBackedEndpointsCache(ctrl.Log, cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS)) if err != nil { setupLog.Error(err, "creating new endpoints cache") os.Exit(1) @@ -123,6 +124,7 @@ func main() { setupLog.Info("starting the endpoints cache") endpointsCache.Start(ctx) + k8sSharedInformerFactory.Start(ctx.Done()) return nil }) @@ -173,10 +175,11 @@ func main() { eg.Go(func() error { proxyTLSConfig := map[string]string{"certificatePath": servingCfg.TLSCertPath, "keyPath": servingCfg.TLSKeyPath, "certstorePaths": servingCfg.TLSCertStorePaths} proxyTLSPort := servingCfg.TLSPort + k8sSharedInformerFactory.WaitForCacheSync(ctx.Done()) setupLog.Info("starting the proxy server with TLS enabled", "port", proxyTLSPort) - if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) { + if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, svcCache, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) { setupLog.Error(err, "tls proxy server failed") return err } @@ -186,9 +189,11 @@ func main() { // start a proxy server without TLS. eg.Go(func() error { + k8sSharedInformerFactory.WaitForCacheSync(ctx.Done()) setupLog.Info("starting the proxy server with TLS disabled", "port", proxyPort) - if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) { + k8sSharedInformerFactory.WaitForCacheSync(ctx.Done()) + if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, svcCache, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) { setupLog.Error(err, "proxy server failed") return err } @@ -369,6 +374,7 @@ func runProxyServer( q queue.Counter, waitFunc forwardWaitFunc, routingTable routing.Table, + svcCache k8s.ServiceCache, timeouts *config.Timeouts, port int, tlsEnabled bool, @@ -416,6 +422,7 @@ func runProxyServer( routingTable, probeHandler, upstreamHandler, + svcCache, tlsEnabled, ) rootHandler = middleware.NewLogging( diff --git a/interceptor/main_test.go b/interceptor/main_test.go index 696ab1de..1809c0cd 100644 --- a/interceptor/main_test.go +++ b/interceptor/main_test.go @@ -63,6 +63,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { // server routingTable := routingtest.NewTable() routingTable.Memory[host] = httpso + svcCache := k8s.NewFakeServiceCache() timeouts := &config.Timeouts{} waiterCh := make(chan struct{}) @@ -77,6 +78,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { q, waitFunc, routingTable, + svcCache, timeouts, port, false, @@ -194,6 +196,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) { // server routingTable := routingtest.NewTable() routingTable.Memory[host] = httpso + svcCache := k8s.NewFakeServiceCache() timeouts := &config.Timeouts{} waiterCh := make(chan struct{}) @@ -209,6 +212,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) { q, waitFunc, routingTable, + svcCache, timeouts, port, true, @@ -339,6 +343,7 @@ func TestRunProxyServerWithMultipleCertsTLSCountMiddleware(t *testing.T) { // server routingTable := routingtest.NewTable() routingTable.Memory[host] = httpso + svcCache := k8s.NewFakeServiceCache() timeouts := &config.Timeouts{} waiterCh := make(chan struct{}) @@ -354,6 +359,7 @@ func TestRunProxyServerWithMultipleCertsTLSCountMiddleware(t *testing.T) { q, waitFunc, routingTable, + svcCache, timeouts, port, true, diff --git a/interceptor/middleware/routing.go b/interceptor/middleware/routing.go index 197f5c50..69c6bcd5 100644 --- a/interceptor/middleware/routing.go +++ b/interceptor/middleware/routing.go @@ -1,6 +1,7 @@ package middleware import ( + "context" "fmt" "net/http" "net/url" @@ -8,6 +9,7 @@ import ( "github.com/kedacore/http-add-on/interceptor/handler" httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1" + "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/routing" "github.com/kedacore/http-add-on/pkg/util" ) @@ -21,14 +23,16 @@ type Routing struct { routingTable routing.Table probeHandler http.Handler upstreamHandler http.Handler + svcCache k8s.ServiceCache tlsEnabled bool } -func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, tlsEnabled bool) *Routing { +func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, svcCache k8s.ServiceCache, tlsEnabled bool) *Routing { return &Routing{ routingTable: routingTable, probeHandler: probeHandler, upstreamHandler: upstreamHandler, + svcCache: svcCache, tlsEnabled: tlsEnabled, } } @@ -52,7 +56,7 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) { } r = r.WithContext(util.ContextWithHTTPSO(r.Context(), httpso)) - stream, err := rm.streamFromHTTPSO(httpso) + stream, err := rm.streamFromHTTPSO(r.Context(), httpso) if err != nil { sh := handler.NewStatic(http.StatusInternalServerError, err) sh.ServeHTTP(w, r) @@ -64,13 +68,36 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) { rm.upstreamHandler.ServeHTTP(w, r) } -func (rm *Routing) streamFromHTTPSO(httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) { +func (rm *Routing) getPort(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (int32, error) { + if httpso.Spec.ScaleTargetRef.Port != 0 { + return httpso.Spec.ScaleTargetRef.Port, nil + } + if httpso.Spec.ScaleTargetRef.PortName == "" { + return 0, fmt.Errorf(`must specify either "port" or "portName"`) + } + svc, err := rm.svcCache.Get(ctx, httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service) + if err != nil { + return 0, fmt.Errorf("failed to get Service: %w", err) + } + for _, port := range svc.Spec.Ports { + if port.Name == httpso.Spec.ScaleTargetRef.PortName { + return port.Port, nil + } + } + return 0, fmt.Errorf("portName %q not found in Service", httpso.Spec.ScaleTargetRef.PortName) +} + +func (rm *Routing) streamFromHTTPSO(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) { + port, err := rm.getPort(ctx, httpso) + if err != nil { + return nil, fmt.Errorf("failed to get port: %w", err) + } if rm.tlsEnabled { return url.Parse(fmt.Sprintf( "https://%s.%s:%d", httpso.Spec.ScaleTargetRef.Service, httpso.GetNamespace(), - httpso.Spec.ScaleTargetRef.Port, + port, )) } //goland:noinspection HttpUrlsUsage @@ -78,7 +105,7 @@ func (rm *Routing) streamFromHTTPSO(httpso *httpv1alpha1.HTTPScaledObject) (*url "http://%s.%s:%d", httpso.Spec.ScaleTargetRef.Service, httpso.GetNamespace(), - httpso.Spec.ScaleTargetRef.Port, + port, )) } diff --git a/interceptor/middleware/routing_test.go b/interceptor/middleware/routing_test.go index b26f8086..b57f2321 100644 --- a/interceptor/middleware/routing_test.go +++ b/interceptor/middleware/routing_test.go @@ -6,8 +6,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1" + "github.com/kedacore/http-add-on/pkg/k8s" routingtest "github.com/kedacore/http-add-on/pkg/routing/test" ) @@ -22,8 +25,9 @@ var _ = Describe("RoutingMiddleware", func() { emptyHandler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}) probeHandler.Handle("/probe", emptyHandler) upstreamHandler.Handle("/upstream", emptyHandler) + svcCache := k8s.NewFakeServiceCache() - rm := NewRouting(routingTable, probeHandler, upstreamHandler, false) + rm := NewRouting(routingTable, probeHandler, upstreamHandler, svcCache, false) Expect(rm).NotTo(BeNil()) Expect(rm.routingTable).To(Equal(routingTable)) Expect(rm.probeHandler).To(Equal(probeHandler)) @@ -40,6 +44,7 @@ var _ = Describe("RoutingMiddleware", func() { var ( upstreamHandler *http.ServeMux probeHandler *http.ServeMux + svcCache *k8s.FakeServiceCache routingTable *routingtest.Table routingMiddleware *Routing w *httptest.ResponseRecorder @@ -50,6 +55,39 @@ var _ = Describe("RoutingMiddleware", func() { Hosts: []string{ host, }, + ScaleTargetRef: httpv1alpha1.ScaleTargetRef{ + Port: 80, + }, + }, + } + + httpsoWithPortName = httpv1alpha1.HTTPScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "keda", + Namespace: "default", + }, + Spec: httpv1alpha1.HTTPScaledObjectSpec{ + Hosts: []string{ + "keda2.sh", + }, + ScaleTargetRef: httpv1alpha1.ScaleTargetRef{ + Service: "keda-svc", + PortName: "http", + }, + }, + } + svc = &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "keda-svc", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + }, + }, }, } ) @@ -58,7 +96,8 @@ var _ = Describe("RoutingMiddleware", func() { upstreamHandler = http.NewServeMux() probeHandler = http.NewServeMux() routingTable = routingtest.NewTable() - routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler, false) + svcCache = k8s.NewFakeServiceCache() + routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler, svcCache, false) w = httptest.NewRecorder() @@ -91,7 +130,40 @@ var _ = Describe("RoutingMiddleware", func() { routingTable.Memory[host] = &httpso routingMiddleware.ServeHTTP(w, r) + Expect(uh).To(BeTrue()) + Expect(ph).To(BeFalse()) + Expect(w.Code).To(Equal(sc)) + Expect(w.Body.String()).To(Equal(st)) + }) + }) + + When("route is found with portName", func() { + It("routes to the upstream handler", func() { + svcCache.Add(*svc) + var ( + sc = http.StatusTeapot + st = http.StatusText(sc) + ) + + var uh bool + upstreamHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTeapot) + + _, err := w.Write([]byte(st)) + Expect(err).NotTo(HaveOccurred()) + + uh = true + })) + var ph bool + probeHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ph = true + })) + + routingTable.Memory["keda2.sh"] = &httpsoWithPortName + + r.Host = "keda2.sh" + routingMiddleware.ServeHTTP(w, r) Expect(uh).To(BeTrue()) Expect(ph).To(BeFalse()) Expect(w.Code).To(Equal(sc)) @@ -99,6 +171,39 @@ var _ = Describe("RoutingMiddleware", func() { }) }) + When("route is found with portName but endpoints are mismatched", func() { + It("errors to route to upstream handler", func() { + var ( + sc = http.StatusTeapot + st = http.StatusText(sc) + ) + + var uh bool + upstreamHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTeapot) + + _, err := w.Write([]byte(st)) + Expect(err).NotTo(HaveOccurred()) + + uh = true + })) + + var ph bool + probeHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ph = true + })) + + routingTable.Memory["keda2.sh"] = &httpsoWithPortName + + r.Host = "keda2.sh" + routingMiddleware.ServeHTTP(w, r) + Expect(uh).To(BeFalse()) + Expect(ph).To(BeFalse()) + Expect(w.Code).To(Equal(http.StatusInternalServerError)) + Expect(w.Body.String()).To(Equal("Internal Server Error")) + }) + }) + When("route is not found", func() { It("routes to the probe handler", func() { const ( diff --git a/interceptor/proxy_handlers_integration_test.go b/interceptor/proxy_handlers_integration_test.go index 6ae02691..8898b275 100644 --- a/interceptor/proxy_handlers_integration_test.go +++ b/interceptor/proxy_handlers_integration_test.go @@ -281,6 +281,7 @@ func newHarness( }, ) + svcCache := k8s.NewFakeServiceCache() endpCache := k8s.NewFakeEndpointsCache() waitFunc := newWorkloadReplicasForwardWaitFunc( logr.Discard(), @@ -308,6 +309,7 @@ func newHarness( respHeaderTimeout: time.Second, }, &tls.Config{}), + svcCache, false, ) diff --git a/operator/apis/http/v1alpha1/httpscaledobject_types.go b/operator/apis/http/v1alpha1/httpscaledobject_types.go index 0b2b039b..9bacfa08 100644 --- a/operator/apis/http/v1alpha1/httpscaledobject_types.go +++ b/operator/apis/http/v1alpha1/httpscaledobject_types.go @@ -31,7 +31,9 @@ type ScaleTargetRef struct { // The name of the service to route to Service string `json:"service"` // The port to route to - Port int32 `json:"port"` + Port int32 `json:"port,omitempty"` + // The port to route to referenced by name + PortName string `json:"portName,omitempty"` } // ReplicaStruct contains the minimum and maximum amount of replicas to have in the deployment @@ -88,6 +90,8 @@ type HTTPScaledObjectSpec struct { // +optional PathPrefixes []string `json:"pathPrefixes,omitempty"` // The name of the deployment to route HTTP requests to (and to autoscale). + // Including validation as a requirement to define either the PortName or the Port + // +kubebuilder:validation:XValidation:rule="has(self.portName) != has(self.port)",message="must define either the 'portName' or the 'port'" ScaleTargetRef ScaleTargetRef `json:"scaleTargetRef"` // (optional) Replica information // +optional diff --git a/pkg/k8s/svc_cache.go b/pkg/k8s/svc_cache.go new file mode 100644 index 00000000..a3c285c4 --- /dev/null +++ b/pkg/k8s/svc_cache.go @@ -0,0 +1,77 @@ +package k8s + +import ( + "context" + "fmt" + "sync" + + "github.com/go-logr/logr" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + listerv1 "k8s.io/client-go/listers/core/v1" +) + +// ServiceCache is an interface for caching service objects +type ServiceCache interface { + // Get gets a service with the given namespace and name from the cache + // If the service doesn't exist in the cache, it will be fetched from the API server + Get(ctx context.Context, namespace, name string) (*v1.Service, error) +} + +// InformerBackedServicesCache is a cache of services backed by a shared informer +type InformerBackedServicesCache struct { + lggr logr.Logger + cl kubernetes.Interface + svcLister listerv1.ServiceLister +} + +// FakeServiceCache is a fake implementation of a ServiceCache for testing +type FakeServiceCache struct { + current map[string]v1.Service + mut sync.RWMutex +} + +// NewInformerBackedServiceCache creates a new InformerBackedServicesCache +func NewInformerBackedServiceCache(lggr logr.Logger, cl kubernetes.Interface, factory informers.SharedInformerFactory) *InformerBackedServicesCache { + return &InformerBackedServicesCache{ + lggr: lggr.WithName("InformerBackedServicesCache"), + cl: cl, + svcLister: factory.Core().V1().Services().Lister(), + } +} + +// Get gets a service with the given namespace and name from the cache and as a fallback from the API server +func (c *InformerBackedServicesCache) Get(ctx context.Context, namespace, name string) (*v1.Service, error) { + svc, err := c.svcLister.Services(namespace).Get(name) + if err == nil { + c.lggr.V(1).Info("Service found in cache", "namespace", namespace, "name", name) + return svc, nil + } + c.lggr.V(1).Info("Service not found in cache, fetching from API server", "namespace", namespace, "name", name, "error", err) + return c.cl.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) +} + +// NewFakeServiceCache creates a new FakeServiceCache +func NewFakeServiceCache() *FakeServiceCache { + return &FakeServiceCache{current: make(map[string]v1.Service)} +} + +// Get gets a service with the given namespace and name from the cache +func (c *FakeServiceCache) Get(_ context.Context, namespace, name string) (*v1.Service, error) { + c.mut.RLock() + defer c.mut.RUnlock() + svc, ok := c.current[key(namespace, name)] + if !ok { + return nil, fmt.Errorf("service not found") + } + return &svc, nil +} + +// Add adds a service to the cache +func (c *FakeServiceCache) Add(svc v1.Service) { + c.mut.Lock() + defer c.mut.Unlock() + c.current[key(svc.Namespace, svc.Name)] = svc +} diff --git a/tests/checks/internal_service_port_name/internal_service_port_name_test.go b/tests/checks/internal_service_port_name/internal_service_port_name_test.go new file mode 100644 index 00000000..eb8db1de --- /dev/null +++ b/tests/checks/internal_service_port_name/internal_service_port_name_test.go @@ -0,0 +1,183 @@ +//go:build e2e +// +build e2e + +package internal_service_port_name_test + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/http-add-on/tests/helper" +) + +const ( + testName = "internal-service-port-name-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + serviceName = fmt.Sprintf("%s-service", testName) + httpScaledObjectName = fmt.Sprintf("%s-http-so", testName) + host = testName + minReplicaCount = 0 + maxReplicaCount = 1 +) + +type templateData struct { + TestNamespace string + DeploymentName string + ServiceName string + HTTPScaledObjectName string + Host string + MinReplicas int + MaxReplicas int +} + +const ( + serviceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.ServiceName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + ports: + - port: 8080 + targetPort: http + protocol: TCP + name: http + selector: + app: {{.DeploymentName}} +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: {{.DeploymentName}} + image: registry.k8s.io/e2e-test-images/agnhost:2.45 + args: + - netexec + ports: + - name: http + containerPort: 8080 + protocol: TCP + readinessProbe: + httpGet: + path: / + port: http +` + + loadJobTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: generate-request + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: curl-client + image: curlimages/curl + imagePullPolicy: Always + command: ["curl", "-H", "Host: {{.Host}}", "keda-http-add-on-interceptor-proxy.keda:8080"] + restartPolicy: Never + activeDeadlineSeconds: 600 + backoffLimit: 5 +` + + httpScaledObjectTemplate = ` +kind: HTTPScaledObject +apiVersion: http.keda.sh/v1alpha1 +metadata: + name: {{.HTTPScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + hosts: + - {{.Host}} + targetPendingRequests: 100 + scaledownPeriod: 10 + scaleTargetRef: + name: {{.DeploymentName}} + service: {{.ServiceName}} + portName: http + replicas: + min: {{ .MinReplicas }} + max: {{ .MaxReplicas }} +` +) + +func TestCheck(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", minReplicaCount) + + testScaleOut(t, kc, data) + testScaleIn(t, kc, data) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + + time.Sleep(5 * time.Second) + KubectlApplyWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 6, 10), + "replica count should be %d after 1 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + + KubectlDeleteWithTemplate(t, data, "loadJobTemplate", loadJobTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 12, 10), + "replica count should be %d after 2 minutes", minReplicaCount) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ServiceName: serviceName, + HTTPScaledObjectName: httpScaledObjectName, + Host: host, + MinReplicas: minReplicaCount, + MaxReplicas: maxReplicaCount, + }, []Template{ + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "serviceNameTemplate", Config: serviceTemplate}, + {Name: "httpScaledObjectTemplate", Config: httpScaledObjectTemplate}, + } +}