diff --git a/pkg/providers/apisix/apisix_upstream.go b/pkg/providers/apisix/apisix_upstream.go index 4d9e29b239..d8a12a25cf 100644 --- a/pkg/providers/apisix/apisix_upstream.go +++ b/pkg/providers/apisix/apisix_upstream.go @@ -278,15 +278,14 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) return err } - var newUps *apisixv1.Upstream + newUps := apisixv1.NewDefaultUpstream() if au.Spec != nil && ev.Type != types.EventDelete { cfg, ok := portLevelSettings[port.Port] if !ok { cfg = au.Spec.ApisixUpstreamConfig } // FIXME Same ApisixUpstreamConfig might be translated multiple times. - newUps, err = c.translator.TranslateUpstreamConfigV2(&cfg) - if err != nil { + if err = c.translator.TranslateUpstreamConfigV2(&cfg, newUps); err != nil { log.Errorw("ApisixUpstream conversion cannot be completed, or the format is incorrect", zap.Any("object", au), zap.Error(err), @@ -295,8 +294,6 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) return err } - } else { - newUps = apisixv1.NewDefaultUpstream() } newUps.Metadata = ups.Metadata diff --git a/pkg/providers/apisix/translation/apisix_plugin.go b/pkg/providers/apisix/translation/apisix_plugin.go index 1115cfe2f0..6e64452704 100644 --- a/pkg/providers/apisix/translation/apisix_plugin.go +++ b/pkg/providers/apisix/translation/apisix_plugin.go @@ -46,11 +46,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *translation.TranslateConte ) for _, backend := range backends { - svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ns) - if err != nil { - return nil, err - } - ups, err := t.translateService(ns, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.TranslateUpstream(ns, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) if err != nil { return nil, err } diff --git a/pkg/providers/apisix/translation/apisix_plugin_test.go b/pkg/providers/apisix/translation/apisix_plugin_test.go index 7c33487312..cfa6144b48 100644 --- a/pkg/providers/apisix/translation/apisix_plugin_test.go +++ b/pkg/providers/apisix/translation/apisix_plugin_test.go @@ -546,7 +546,7 @@ func TestTranslateTrafficSplitPluginBadCases(t *testing.T) { cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends) assert.Nil(t, cfg) assert.NotNil(t, err) - assert.Equal(t, "service.spec.ports: port not defined", err.Error()) + assert.Contains(t, err.Error(), "service.Spec.Ports: port port-not-found not found") backends[1].ServicePort.StrVal = "port2" backends[1].ResolveGranularity = "service" diff --git a/pkg/providers/apisix/translation/apisix_route.go b/pkg/providers/apisix/translation/apisix_route.go index d5ab4bb6ad..ed006fbcf0 100644 --- a/pkg/providers/apisix/translation/apisix_route.go +++ b/pkg/providers/apisix/translation/apisix_route.go @@ -21,7 +21,6 @@ import ( "strings" "go.uber.org/zap" - "k8s.io/apimachinery/pkg/util/intstr" "github.com/apache/apisix-ingress-controller/pkg/config" "github.com/apache/apisix-ingress-controller/pkg/id" @@ -120,16 +119,6 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext backend := backends[0] backends = backends[1:] - svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } - pluginMap := make(apisixv1.Plugins) // add route plugins for _, plugin := range part.Plugins { @@ -156,6 +145,7 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext } var exprs [][]apisixv1.StringOrSlice + var err error if part.Match.NginxVars != nil { exprs, err = t.TranslateRouteMatchExprs(part.Match.NginxVars) if err != nil { @@ -175,7 +165,6 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -185,7 +174,6 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext route.Hosts = part.Match.Hosts route.Uris = part.Match.Paths route.Methods = part.Match.Methods - route.UpstreamId = id.GenID(upstreamName) route.EnableWebsocket = part.Websocket route.Plugins = pluginMap @@ -206,14 +194,15 @@ func (t *translator) translateHTTPRouteV2beta2(ctx *translation.TranslateContext } route.Plugins["traffic-split"] = plugin } - ctx.AddRoute(route) - if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) - if err != nil { - return err - } + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) + if err != nil { + return err + } + if !ctx.CheckUpstreamExist(ups.Name) { ctx.AddUpstream(ups) } + route.UpstreamId = ups.ID + ctx.AddRoute(route) } return nil } @@ -231,16 +220,6 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext backend := backends[0] backends = backends[1:] - svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } - var timeout *apisixv1.UpstreamTimeout if part.Timeout != nil { timeout = &apisixv1.UpstreamTimeout{ @@ -290,6 +269,7 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext } var exprs [][]apisixv1.StringOrSlice + var err error if part.Match.NginxVars != nil { exprs, err = t.TranslateRouteMatchExprs(part.Match.NginxVars) if err != nil { @@ -309,7 +289,6 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -319,7 +298,6 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext route.Hosts = part.Match.Hosts route.Uris = part.Match.Paths route.Methods = part.Match.Methods - route.UpstreamId = id.GenID(upstreamName) route.EnableWebsocket = part.Websocket route.Plugins = pluginMap route.Timeout = timeout @@ -342,14 +320,15 @@ func (t *translator) translateHTTPRouteV2beta3(ctx *translation.TranslateContext } route.Plugins["traffic-split"] = plugin } - ctx.AddRoute(route) - if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) - if err != nil { - return err - } + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) + if err != nil { + return err + } + if !ctx.CheckUpstreamExist(ups.Name) { ctx.AddUpstream(ups) } + route.UpstreamId = ups.ID + ctx.AddRoute(route) } return nil } @@ -367,16 +346,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar backend := backends[0] backends = backends[1:] - svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } - var timeout *apisixv1.UpstreamTimeout if part.Timeout != nil { timeout = &apisixv1.UpstreamTimeout{ @@ -426,6 +395,7 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar } var exprs [][]apisixv1.StringOrSlice + var err error if part.Match.NginxVars != nil { exprs, err = t.TranslateRouteMatchExprs(part.Match.NginxVars) if err != nil { @@ -445,7 +415,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -455,7 +424,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar route.Hosts = part.Match.Hosts route.Uris = part.Match.Paths route.Methods = part.Match.Methods - route.UpstreamId = id.GenID(upstreamName) route.EnableWebsocket = part.Websocket route.Plugins = pluginMap route.Timeout = timeout @@ -478,14 +446,15 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar } route.Plugins["traffic-split"] = plugin } - ctx.AddRoute(route) - if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) - if err != nil { - return err - } + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) + if err != nil { + return err + } + if !ctx.CheckUpstreamExist(ups.Name) { ctx.AddUpstream(ups) } + route.UpstreamId = ups.ID + ctx.AddRoute(route) } return nil } @@ -739,20 +708,12 @@ func (t *translator) translateStreamRouteV2beta2(ctx *translation.TranslateConte } ruleNameMap[part.Name] = struct{}{} backend := part.Backend - svcClusterIP, svcPort, err := t.getStreamServiceClusterIPAndPortV2beta2(backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } + sr := apisixv1.NewDefaultStreamRoute() name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) if err != nil { return err } @@ -774,20 +735,11 @@ func (t *translator) translateStreamRouteV2beta3(ctx *translation.TranslateConte } ruleNameMap[part.Name] = struct{}{} backend := part.Backend - svcClusterIP, svcPort, err := t.getStreamServiceClusterIPAndPortV2beta3(backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } sr := apisixv1.NewDefaultStreamRoute() name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) if err != nil { return err } @@ -809,15 +761,6 @@ func (t *translator) translateStreamRouteV2(ctx *translation.TranslateContext, a } ruleNameMap[part.Name] = struct{}{} backend := part.Backend - svcClusterIP, svcPort, err := t.getStreamServiceClusterIPAndPortV2(backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } // add stream route plugins pluginMap := make(apisixv1.Plugins) @@ -836,7 +779,7 @@ func (t *translator) translateStreamRouteV2(ctx *translation.TranslateContext, a name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.TranslateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, backend.ServicePort) if err != nil { return err } @@ -914,165 +857,6 @@ func (t *translator) translateStreamRouteNotStrictlyV2(ctx *translation.Translat return nil } -func (t *translator) GetServiceClusterIPAndPort(backend *configv2.ApisixRouteHTTPBackend, ns string) (string, int32, error) { - svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) - if err != nil { - return "", 0, err - } - svcPort := int32(-1) - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.Any("namespace", ns), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } -loop: - for _, port := range svc.Spec.Ports { - switch backend.ServicePort.Type { - case intstr.Int: - if backend.ServicePort.IntVal == port.Port { - svcPort = port.Port - break loop - } - case intstr.String: - if backend.ServicePort.StrVal == port.Name { - svcPort = port.Port - break loop - } - } - } - if svcPort == -1 { - log.Errorw("ApisixRoute refers to non-existent Service port", - zap.String("namespace", ns), - zap.String("port", backend.ServicePort.String()), - ) - return "", 0, err - } - - return svc.Spec.ClusterIP, svcPort, nil -} - -// getStreamServiceClusterIPAndPortV2beta2 is for v2beta2 streamRoute -func (t *translator) getStreamServiceClusterIPAndPortV2beta2(backend configv2beta2.ApisixRouteStreamBackend, ns string) (string, int32, error) { - svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) - if err != nil { - return "", 0, err - } - svcPort := int32(-1) - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.String("ApisixRoute namespace", ns), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } -loop: - for _, port := range svc.Spec.Ports { - switch backend.ServicePort.Type { - case intstr.Int: - if backend.ServicePort.IntVal == port.Port { - svcPort = port.Port - break loop - } - case intstr.String: - if backend.ServicePort.StrVal == port.Name { - svcPort = port.Port - break loop - } - } - } - if svcPort == -1 { - log.Errorw("ApisixRoute refers to non-existent Service port", - zap.String("ApisixRoute namespace", ns), - zap.String("port", backend.ServicePort.String()), - ) - return "", 0, err - } - - return svc.Spec.ClusterIP, svcPort, nil -} - -// getStreamServiceClusterIPAndPortV2beta3 is for v2beta3 streamRoute -func (t *translator) getStreamServiceClusterIPAndPortV2beta3(backend configv2beta3.ApisixRouteStreamBackend, ns string) (string, int32, error) { - svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) - if err != nil { - return "", 0, err - } - svcPort := int32(-1) - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.String("ApisixRoute namespace", ns), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } -loop: - for _, port := range svc.Spec.Ports { - switch backend.ServicePort.Type { - case intstr.Int: - if backend.ServicePort.IntVal == port.Port { - svcPort = port.Port - break loop - } - case intstr.String: - if backend.ServicePort.StrVal == port.Name { - svcPort = port.Port - break loop - } - } - } - if svcPort == -1 { - log.Errorw("ApisixRoute refers to non-existent Service port", - zap.String("ApisixRoute namespace", ns), - zap.String("port", backend.ServicePort.String()), - ) - return "", 0, err - } - - return svc.Spec.ClusterIP, svcPort, nil -} - -// getStreamServiceClusterIPAndPortV2 is for v2 streamRoute -func (t *translator) getStreamServiceClusterIPAndPortV2(backend configv2.ApisixRouteStreamBackend, ns string) (string, int32, error) { - svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) - if err != nil { - return "", 0, err - } - svcPort := int32(-1) - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.String("ApisixRoute namespace", ns), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } -loop: - for _, port := range svc.Spec.Ports { - switch backend.ServicePort.Type { - case intstr.Int: - if backend.ServicePort.IntVal == port.Port { - svcPort = port.Port - break loop - } - case intstr.String: - if backend.ServicePort.StrVal == port.Name { - svcPort = port.Port - break loop - } - } - } - if svcPort == -1 { - log.Errorw("ApisixRoute refers to non-existent Service port", - zap.String("ApisixRoute namespace", ns), - zap.String("port", backend.ServicePort.String()), - ) - return "", 0, err - } - - return svc.Spec.ClusterIP, svcPort, nil -} - func (t *translator) TranslateOldRoute(ar kube.ApisixRoute) (*translation.TranslateContext, error) { switch ar.GroupVersion() { case config.ApisixV2: diff --git a/pkg/providers/apisix/translation/apisix_upstream.go b/pkg/providers/apisix/translation/apisix_upstream.go index d3d725af50..1cae49fbf4 100644 --- a/pkg/providers/apisix/translation/apisix_upstream.go +++ b/pkg/providers/apisix/translation/apisix_upstream.go @@ -16,8 +16,6 @@ package translation import ( "github.com/apache/apisix-ingress-controller/pkg/id" - "github.com/apache/apisix-ingress-controller/pkg/providers/translation" - "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -28,22 +26,3 @@ func (t *translator) translateUpstreamNotStrictly(namespace, svcName, subset str ups.ID = id.GenID(ups.Name) return ups, nil } - -func (t *translator) translateService(namespace, svcName, subset, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) { - ups, err := t.TranslateService(namespace, svcName, subset, svcPort) - if err != nil { - return nil, err - } - if svcResolveGranularity == types.ResolveGranularity.Service { - ups.Nodes = apisixv1.UpstreamNodes{ - { - Host: svcClusterIP, - Port: int(svcPort), - Weight: translation.DefaultWeight, - }, - } - } - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, subset, svcPort, svcResolveGranularity) - ups.ID = id.GenID(ups.Name) - return ups, nil -} diff --git a/pkg/providers/gateway/translation/gateway_httproute.go b/pkg/providers/gateway/translation/gateway_httproute.go index cf442afa59..eed6426094 100644 --- a/pkg/providers/gateway/translation/gateway_httproute.go +++ b/pkg/providers/gateway/translation/gateway_httproute.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/intstr" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/pkg/id" @@ -92,7 +93,7 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha continue } - ups, err := t.KubeTranslator.TranslateService(ns, string(backend.Name), "", int32(*backend.Port)) + ups, err := t.KubeTranslator.TranslateUpstream(ns, string(backend.Name), "", "", intstr.FromInt(int(*backend.Port))) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].BackendRefs[%v]", i, j)) } diff --git a/pkg/providers/gateway/translation/gateway_tlsroute.go b/pkg/providers/gateway/translation/gateway_tlsroute.go index ced0f1a7d5..0e71143e9a 100644 --- a/pkg/providers/gateway/translation/gateway_tlsroute.go +++ b/pkg/providers/gateway/translation/gateway_tlsroute.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/intstr" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/apache/apisix-ingress-controller/pkg/id" @@ -86,7 +87,7 @@ func (t *translator) TranslateGatewayTLSRouteV1Alpha2(tlsRoute *gatewayv1alpha2. continue } - ups, err := t.KubeTranslator.TranslateService(ns, string(backend.Name), "", int32(*backend.Port)) + ups, err := t.KubeTranslator.TranslateUpstream(ns, string(backend.Name), "", "", intstr.FromInt(int(*backend.Port))) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].BackendRefs[%v]", i, j)) } diff --git a/pkg/providers/ingress/translation/translator.go b/pkg/providers/ingress/translation/translator.go index c4ed6426e0..0c0bd978cf 100644 --- a/pkg/providers/ingress/translation/translator.go +++ b/pkg/providers/ingress/translation/translator.go @@ -38,7 +38,6 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/log" apisixtranslation "github.com/apache/apisix-ingress-controller/pkg/providers/apisix/translation" "github.com/apache/apisix-ingress-controller/pkg/providers/translation" - "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -59,7 +58,7 @@ type IngressTranslator interface { // TranslateIngress composes a couple of APISIX Routes and upstreams according // to the given Ingress resource. // For old objects, you cannot use TranslateIngress to build. Because it needs to parse the latest service, which will cause data inconsistency. - TranslateIngress(ing kube.Ingress, args ...bool) (*translation.TranslateContext, error) + TranslateIngress(ing kube.Ingress) (*translation.TranslateContext, error) // TranslateOldIngress get route objects from cache // Build upstream and plugin_config through route TranslateOldIngress(kube.Ingress) (*translation.TranslateContext, error) @@ -102,18 +101,14 @@ func (t *translator) TranslateIngressTLS(namespace, ingName, secretName string, return t.ApisixTranslator.TranslateSSLV2(&apisixTls) } -func (t *translator) TranslateIngress(ing kube.Ingress, args ...bool) (*translation.TranslateContext, error) { - var skipVerify = false - if len(args) != 0 { - skipVerify = args[0] - } +func (t *translator) TranslateIngress(ing kube.Ingress) (*translation.TranslateContext, error) { switch ing.GroupVersion() { case kube.IngressV1: - return t.translateIngressV1(ing.V1(), skipVerify) + return t.translateIngressV1(ing.V1()) case kube.IngressV1beta1: - return t.translateIngressV1beta1(ing.V1beta1(), skipVerify) + return t.translateIngressV1beta1(ing.V1beta1()) case kube.IngressExtensionsV1beta1: - return t.translateIngressExtensionsV1beta1(ing.ExtensionsV1beta1(), skipVerify) + return t.translateIngressExtensionsV1beta1(ing.ExtensionsV1beta1()) default: return nil, fmt.Errorf("translator: source group version not supported: %s", ing.GroupVersion()) } @@ -123,7 +118,7 @@ const ( _regexPriority = 100 ) -func (t *translator) translateIngressV1(ing *networkingv1.Ingress, skipVerify bool) (*translation.TranslateContext, error) { +func (t *translator) translateIngressV1(ing *networkingv1.Ingress) (*translation.TranslateContext, error) { ctx := translation.DefaultEmptyTranslateContext() ingress := t.TranslateAnnotations(ing.Annotations) @@ -146,17 +141,19 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress, skipVerify bo err error ) if pathRule.Backend.Service != nil { - if skipVerify { - ups = t.translateDefaultUpstreamFromIngressV1(ing.Namespace, pathRule.Backend.Service) + var port intstr.IntOrString + if pathRule.Backend.Service.Port.Name != "" { + port = intstr.FromString(pathRule.Backend.Service.Port.Name) } else { - ups, err = t.translateUpstreamFromIngressV1(ing.Namespace, pathRule.Backend.Service) - if err != nil { - log.Errorw("failed to translate ingress backend to upstream", - zap.Error(err), - zap.Any("ingress", ing), - ) - return nil, err - } + port = intstr.FromInt(int(pathRule.Backend.Service.Port.Number)) + } + ups, err = t.TranslateUpstream(ing.Namespace, pathRule.Backend.Service.Name, "", "", port) + if err != nil { + log.Errorw("failed to translate ingress backend to upstream", + zap.Error(err), + zap.Any("ingress", ing), + ) + return nil, err } ctx.AddUpstream(ups) } @@ -221,7 +218,7 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress, skipVerify bo return ctx, nil } -func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress, skipVerify bool) (*translation.TranslateContext, error) { +func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress) (*translation.TranslateContext, error) { ctx := translation.DefaultEmptyTranslateContext() ingress := t.TranslateAnnotations(ing.Annotations) @@ -244,17 +241,13 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress, ski err error ) if pathRule.Backend.ServiceName != "" { - if skipVerify { - ups = t.translateDefaultUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) - } else { - ups, err = t.translateUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) - if err != nil { - log.Errorw("failed to translate ingress backend to upstream", - zap.Error(err), - zap.Any("ingress", ing), - ) - return nil, err - } + ups, err = t.TranslateUpstream(ing.Namespace, pathRule.Backend.ServiceName, "", "", pathRule.Backend.ServicePort) + if err != nil { + log.Errorw("failed to translate ingress backend to upstream", + zap.Error(err), + zap.Any("ingress", ing), + ) + return nil, err } ctx.AddUpstream(ups) } @@ -319,61 +312,7 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress, ski return ctx, nil } -func (t *translator) translateDefaultUpstreamFromIngressV1(namespace string, backend *networkingv1.IngressServiceBackend) *apisixv1.Upstream { - var portNumber int32 - if backend.Port.Name != "" { - svc, err := t.ServiceLister.Services(namespace).Get(backend.Name) - if err != nil { - portNumber = 0 - } else { - for _, port := range svc.Spec.Ports { - if port.Name == backend.Port.Name { - portNumber = port.Port - break - } - } - } - - } else { - portNumber = backend.Port.Number - } - ups := apisixv1.NewDefaultUpstream() - ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", portNumber, types.ResolveGranularity.Endpoint) - ups.ID = id.GenID(ups.Name) - return ups -} -func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *networkingv1.IngressServiceBackend) (*apisixv1.Upstream, error) { - var svcPort int32 - if backend.Port.Name != "" { - svc, err := t.ServiceLister.Services(namespace).Get(backend.Name) - if err != nil { - return nil, err - } - for _, port := range svc.Spec.Ports { - if port.Name == backend.Port.Name { - svcPort = port.Port - break - } - } - if svcPort == 0 { - return nil, &translation.TranslateError{ - Field: "service", - Reason: "port not found", - } - } - } else { - svcPort = backend.Port.Number - } - ups, err := t.TranslateService(namespace, backend.Name, "", svcPort) - if err != nil { - return nil, err - } - ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", svcPort, types.ResolveGranularity.Endpoint) - ups.ID = id.GenID(ups.Name) - return ups, nil -} - -func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.Ingress, skipVerify bool) (*translation.TranslateContext, error) { +func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.Ingress) (*translation.TranslateContext, error) { ctx := translation.DefaultEmptyTranslateContext() ingress := t.TranslateAnnotations(ing.Annotations) @@ -385,17 +324,13 @@ func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.In ) if pathRule.Backend.ServiceName != "" { // Structure here is same to ingress.extensions/v1beta1, so just use this method. - if skipVerify { - ups = t.translateDefaultUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) - } else { - ups, err = t.translateUpstreamFromIngressV1beta1(ing.Namespace, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) - if err != nil { - log.Errorw("failed to translate ingress backend to upstream", - zap.Error(err), - zap.Any("ingress", ing), - ) - return nil, err - } + ups, err = t.TranslateUpstream(ing.Namespace, pathRule.Backend.ServiceName, "", "", pathRule.Backend.ServicePort) + if err != nil { + log.Errorw("failed to translate ingress backend to upstream", + zap.Error(err), + zap.Any("ingress", ing), + ) + return nil, err } ctx.AddUpstream(ups) } @@ -461,60 +396,6 @@ func (t *translator) translateIngressExtensionsV1beta1(ing *extensionsv1beta1.In return ctx, nil } -func (t *translator) translateDefaultUpstreamFromIngressV1beta1(namespace string, svcName string, svcPort intstr.IntOrString) *apisixv1.Upstream { - var portNumber int32 - if svcPort.Type == intstr.String { - svc, err := t.ServiceLister.Services(namespace).Get(svcName) - if err != nil { - portNumber = 0 - } else { - for _, port := range svc.Spec.Ports { - if port.Name == svcPort.StrVal { - portNumber = port.Port - break - } - } - } - } else { - portNumber = svcPort.IntVal - } - ups := apisixv1.NewDefaultUpstream() - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber, types.ResolveGranularity.Endpoint) - ups.ID = id.GenID(ups.Name) - return ups -} - -func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcName string, svcPort intstr.IntOrString) (*apisixv1.Upstream, error) { - var portNumber int32 - if svcPort.Type == intstr.String { - svc, err := t.ServiceLister.Services(namespace).Get(svcName) - if err != nil { - return nil, err - } - for _, port := range svc.Spec.Ports { - if port.Name == svcPort.StrVal { - portNumber = port.Port - break - } - } - if portNumber == 0 { - return nil, &translation.TranslateError{ - Field: "service", - Reason: "port not found", - } - } - } else { - portNumber = svcPort.IntVal - } - ups, err := t.TranslateService(namespace, svcName, "", portNumber) - if err != nil { - return nil, err - } - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber, types.ResolveGranularity.Endpoint) - ups.ID = id.GenID(ups.Name) - return ups, nil -} - func (t *translator) TranslateOldIngress(ing kube.Ingress) (*translation.TranslateContext, error) { switch ing.GroupVersion() { case kube.IngressV1: diff --git a/pkg/providers/k8s/endpoint/base.go b/pkg/providers/k8s/endpoint/base.go index 91d462af35..de45f75352 100644 --- a/pkg/providers/k8s/endpoint/base.go +++ b/pkg/providers/k8s/endpoint/base.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/intstr" listerscorev1 "k8s.io/client-go/listers/core/v1" "github.com/apache/apisix-ingress-controller/pkg/config" @@ -78,7 +79,7 @@ func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpo clusters := c.APISIX.ListClusters() for _, port := range svc.Spec.Ports { for _, subset := range subsets { - nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels) + nodes, err := c.translator.TranslateEndpoint(ep, intstr.FromInt(int(port.Port)), subset.Labels) if err != nil { log.Errorw("failed to translate upstream nodes", zap.Error(err), @@ -109,7 +110,7 @@ func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpo clusters := c.APISIX.ListClusters() for _, port := range svc.Spec.Ports { for _, subset := range subsets { - nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels) + nodes, err := c.translator.TranslateEndpoint(ep, intstr.FromInt(int(port.Port)), subset.Labels) if err != nil { log.Errorw("failed to translate upstream nodes", zap.Error(err), diff --git a/pkg/providers/translation/apisix_upstream.go b/pkg/providers/translation/apisix_upstream.go index 9a57b566c8..ee9a42d133 100644 --- a/pkg/providers/translation/apisix_upstream.go +++ b/pkg/providers/translation/apisix_upstream.go @@ -42,24 +42,23 @@ func (t *translator) TranslateUpstreamConfigV2beta3(au *configv2beta3.ApisixUpst return ups, nil } -func (t *translator) TranslateUpstreamConfigV2(au *configv2.ApisixUpstreamConfig) (*apisixv1.Upstream, error) { - ups := apisixv1.NewDefaultUpstream() +func (t *translator) TranslateUpstreamConfigV2(au *configv2.ApisixUpstreamConfig, ups *apisixv1.Upstream) error { if err := t.translateUpstreamScheme(au.Scheme, ups); err != nil { - return nil, err + return err } if err := t.translateUpstreamLoadBalancerV2(au.LoadBalancer, ups); err != nil { - return nil, err + return err } if err := t.translateUpstreamHealthCheckV2(au.HealthCheck, ups); err != nil { - return nil, err + return err } if err := t.translateUpstreamRetriesAndTimeoutV2(au.Retries, au.Timeout, ups); err != nil { - return nil, err + return err } if err := t.translateClientTLSV2(au.TLSSecret, ups); err != nil { - return nil, err + return err } - return ups, nil + return nil } func (t *translator) translateUpstreamRetriesAndTimeoutV2beta3(retries *int, timeout *configv2beta3.UpstreamTimeout, ups *apisixv1.Upstream) error { diff --git a/pkg/providers/translation/translator.go b/pkg/providers/translation/translator.go index 0627b6d00b..4b877e8339 100644 --- a/pkg/providers/translation/translator.go +++ b/pkg/providers/translation/translator.go @@ -17,6 +17,8 @@ package translation import ( "fmt" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" listerscorev1 "k8s.io/client-go/listers/core/v1" "github.com/apache/apisix-ingress-controller/pkg/kube" @@ -46,7 +48,13 @@ type Translator interface { TranslateUpstreamConfigV2beta3(*configv2beta3.ApisixUpstreamConfig) (*apisixv1.Upstream, error) // TranslateUpstreamConfigV2 translates ApisixUpstreamConfig (part of ApisixUpstream) // to APISIX Upstream, it doesn't fill the the Upstream metadata and nodes. - TranslateUpstreamConfigV2(*configv2.ApisixUpstreamConfig) (*apisixv1.Upstream, error) + TranslateUpstreamConfigV2(*configv2.ApisixUpstreamConfig, *apisixv1.Upstream) error + // TranslateService translates the K8s Service to APISIX Upstream nodes (cluster ip) + TranslateService(*corev1.Service, intstr.IntOrString) (apisixv1.UpstreamNodes, error) + // TranslateEndpoint translates the K8s Endpoint to APISIX Upstream nodes (pod ip) + // according to the given port. Extra labels can be passed to filter the ultimate + // upstream nodes. + TranslateEndpoint(kube.Endpoint, intstr.IntOrString, types.Labels) (apisixv1.UpstreamNodes, error) // TranslateUpstream composes an upstream according to the // given namespace, name (searching Service/Endpoints) and port (filtering Endpoints). // The returned Upstream doesn't have metadata info. @@ -56,11 +64,12 @@ type Translator interface { // matching the subset labels (defined in ApisixUpstream) will be selected. // When the subset is not found, the node list will be empty. When the subset is empty, // all pods IP will be filled. - TranslateService(string, string, string, int32) (*apisixv1.Upstream, error) - // TranslateUpstreamNodes translate Endpoints resources to APISIX Upstream nodes - // according to the give port. Extra labels can be passed to filter the ultimate + // ResolveGranularity determines the granularity of the generated nodes of upstream. It supports service/endpoint. + TranslateUpstream(namespace string, name string, subset string, resolveGranularity string, port intstr.IntOrString) (*apisixv1.Upstream, error) + // TranslateUpstreamNodes translates the K8s Endpoint to APISIX Upstream nodes + // according to the given port. Extra labels can be passed to filter the ultimate // upstream nodes. - TranslateEndpoint(kube.Endpoint, int32, types.Labels) (apisixv1.UpstreamNodes, error) + TranslateUpstreamNodes(namespace string, name string, resolveGranularity string, port intstr.IntOrString, labels types.Labels) (apisixv1.UpstreamNodes, error) } // TranslatorOptions contains options to help Translator diff --git a/pkg/providers/translation/translator_test.go b/pkg/providers/translation/translator_test.go index 3bc0614848..96cad302a6 100644 --- a/pkg/providers/translation/translator_test.go +++ b/pkg/providers/translation/translator_test.go @@ -107,7 +107,8 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { Scheme: apisixv1.SchemeGRPC, } - ups, err := tr.TranslateUpstreamConfigV2(au) + ups := apisixv1.NewDefaultUpstream() + err := tr.TranslateUpstreamConfigV2(au, ups) assert.Nil(t, err, "checking upstream config translating") assert.Equal(t, apisixv1.LbRoundRobin, ups.Type) assert.Equal(t, apisixv1.SchemeGRPC, ups.Scheme) @@ -120,7 +121,7 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { }, Scheme: apisixv1.SchemeHTTP, } - ups, err = tr.TranslateUpstreamConfigV2(au) + err = tr.TranslateUpstreamConfigV2(au, ups) assert.Nil(t, err, "checking upstream config translating") assert.Equal(t, apisixv1.LbConsistentHash, ups.Type) assert.Equal(t, "user-agent", ups.Key) @@ -135,7 +136,7 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { }, Scheme: "dns", } - _, err = tr.TranslateUpstreamConfigV2(au) + err = tr.TranslateUpstreamConfigV2(au, ups) assert.Error(t, err, &TranslateError{ Field: "scheme", Reason: "invalid value", @@ -146,7 +147,7 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { Type: "hash", }, } - _, err = tr.TranslateUpstreamConfigV2(au) + err = tr.TranslateUpstreamConfigV2(au, ups) assert.Error(t, err, &TranslateError{ Field: "loadbalancer.type", Reason: "invalid value", @@ -158,7 +159,7 @@ func TestTranslateUpstreamConfigV2(t *testing.T) { HashOn: "arg", }, } - _, err = tr.TranslateUpstreamConfigV2(au) + err = tr.TranslateUpstreamConfigV2(au, ups) assert.Error(t, err, &TranslateError{ Field: "loadbalancer.hashOn", Reason: "invalid value", @@ -244,14 +245,14 @@ func TestTranslateUpstreamNodes(t *testing.T) { }} <-processCh - nodes, err := tr.TranslateEndpoint(kube.NewEndpoint(endpoints), 10080, nil) + nodes, err := tr.TranslateEndpoint(kube.NewEndpoint(endpoints), intstr.FromInt(10080), nil) assert.Nil(t, nodes) assert.Equal(t, &TranslateError{ - Field: "service.spec.ports", - Reason: "port not defined", + Field: "service.Spec.Ports", + Reason: "service.Spec.Ports: port 10080 not found", }, err) - nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), 80, nil) + nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), intstr.FromInt(80), nil) assert.Nil(t, err) assert.Equal(t, apisixv1.UpstreamNodes{ { @@ -266,7 +267,7 @@ func TestTranslateUpstreamNodes(t *testing.T) { }, }, nodes) - nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), 443, nil) + nodes, err = tr.TranslateEndpoint(kube.NewEndpoint(endpoints), intstr.FromInt(443), nil) assert.Nil(t, err) assert.Equal(t, apisixv1.UpstreamNodes{ { @@ -373,14 +374,14 @@ func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) { }} <-processCh - nodes, err := tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), 10080, nil) + nodes, err := tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), intstr.FromInt(10080), nil) assert.Nil(t, nodes) assert.Equal(t, err, &TranslateError{ - Field: "service.spec.ports", - Reason: "port not defined", + Field: "service.Spec.Ports", + Reason: "service.Spec.Ports: port 10080 not found", }) - nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), 80, nil) + nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), intstr.FromInt(80), nil) assert.Nil(t, err) assert.Equal(t, apisixv1.UpstreamNodes{ { @@ -395,7 +396,7 @@ func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) { }, }, nodes) - nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), 443, nil) + nodes, err = tr.TranslateEndpoint(kube.NewEndpointWithSlice(ep), intstr.FromInt(443), nil) assert.Nil(t, err) assert.Equal(t, apisixv1.UpstreamNodes{ { diff --git a/pkg/providers/translation/service.go b/pkg/providers/translation/upstream.go similarity index 60% rename from pkg/providers/translation/service.go rename to pkg/providers/translation/upstream.go index 2c097eaa68..9766b9d6aa 100644 --- a/pkg/providers/translation/service.go +++ b/pkg/providers/translation/upstream.go @@ -17,41 +17,48 @@ package translation import ( + "errors" "fmt" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/intstr" "github.com/apache/apisix-ingress-controller/pkg/config" + "github.com/apache/apisix-ingress-controller/pkg/id" "github.com/apache/apisix-ingress-controller/pkg/kube" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) -func (t *translator) TranslateService(namespace, name, subset string, port int32) (*apisixv1.Upstream, error) { - endpoint, err := t.EndpointLister.GetEndpoint(namespace, name) +func (t *translator) TranslateUpstream(namespace, name, subset, resolveGranularity string, port intstr.IntOrString) (*apisixv1.Upstream, error) { + svc, err := t.ServiceLister.Services(namespace).Get(name) if err != nil { - return nil, &TranslateError{ - Field: "endpoints", - Reason: err.Error(), - } + return nil, err } - + svcPort, err := t.parseServicePort(svc, port) + if err != nil { + return nil, err + } + port = intstr.FromInt(int(svcPort.Port)) switch t.APIVersion { case config.ApisixV2beta3: - return t.translateUpstreamV2beta3(&endpoint, namespace, name, subset, port) + return t.translateUpstreamV2beta3(namespace, name, subset, port, resolveGranularity) case config.ApisixV2: - return t.translateUpstreamV2(&endpoint, namespace, name, subset, port) + return t.translateUpstreamV2(namespace, name, subset, port, resolveGranularity) default: panic(fmt.Errorf("unsupported ApisixUpstream version %v", t.APIVersion)) } } -func (t *translator) translateUpstreamV2(ep *kube.Endpoint, namespace, name, subset string, port int32) (*apisixv1.Upstream, error) { - au, err := t.ApisixUpstreamLister.V2(namespace, name) +func (t *translator) translateUpstreamV2(namespace, name, subset string, port intstr.IntOrString, resolveGranularity string) (*apisixv1.Upstream, error) { ups := apisixv1.NewDefaultUpstream() + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal, resolveGranularity) + ups.ID = id.GenID(ups.Name) + + au, err := t.ApisixUpstreamLister.V2(namespace, name) if err != nil { if k8serrors.IsNotFound(err) { // If subset in ApisixRoute is not empty but the ApisixUpstream resource not found, @@ -77,33 +84,33 @@ func (t *translator) translateUpstreamV2(ep *kube.Endpoint, namespace, name, sub } } // Filter nodes by subset. - nodes, err := t.TranslateEndpoint(*ep, port, labels) + ups.Nodes, err = t.TranslateUpstreamNodes(namespace, name, resolveGranularity, port, labels) if err != nil { return nil, err } if au == nil || au.V2().Spec == nil { - ups.Nodes = nodes return ups, nil } upsCfg := &au.V2().Spec.ApisixUpstreamConfig for _, pls := range au.V2().Spec.PortLevelSettings { - if pls.Port == port { + if pls.Port == port.IntVal { upsCfg = &pls.ApisixUpstreamConfig break } } - ups, err = t.TranslateUpstreamConfigV2(upsCfg) - if err != nil { + if err := t.TranslateUpstreamConfigV2(upsCfg, ups); err != nil { return nil, err } - ups.Nodes = nodes return ups, nil } -func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name, subset string, port int32) (*apisixv1.Upstream, error) { - au, err := t.ApisixUpstreamLister.V2beta3(namespace, name) +func (t *translator) translateUpstreamV2beta3(namespace, name, subset string, port intstr.IntOrString, resolveGranularity string) (*apisixv1.Upstream, error) { ups := apisixv1.NewDefaultUpstream() + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal, resolveGranularity) + ups.ID = id.GenID(ups.Name) + + au, err := t.ApisixUpstreamLister.V2beta3(namespace, name) if err != nil { if k8serrors.IsNotFound(err) { // If subset in ApisixRoute is not empty but the ApisixUpstream resource not found, @@ -119,21 +126,7 @@ func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name } } } - if err != nil { - if k8serrors.IsNotFound(err) { - // If subset in ApisixRoute is not empty but the ApisixUpstream resource not found, - // just set an empty node list. - if subset != "" { - ups.Nodes = apisixv1.UpstreamNodes{} - return ups, nil - } - } else { - return nil, &TranslateError{ - Field: "ApisixUpstream", - Reason: err.Error(), - } - } - } + var labels types.Labels if subset != "" { for _, ss := range au.V2beta3().Spec.Subsets { @@ -144,7 +137,7 @@ func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name } } // Filter nodes by subset. - nodes, err := t.TranslateEndpoint(*ep, port, labels) + nodes, err := t.TranslateUpstreamNodes(namespace, name, resolveGranularity, port, labels) if err != nil { return nil, err } @@ -155,7 +148,7 @@ func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name upsCfg := &au.V2beta3().Spec.ApisixUpstreamConfig for _, pls := range au.V2beta3().Spec.PortLevelSettings { - if pls.Port == port { + if pls.Port == port.IntVal { upsCfg = &pls.ApisixUpstreamConfig break } @@ -165,10 +158,12 @@ func (t *translator) translateUpstreamV2beta3(ep *kube.Endpoint, namespace, name return nil, err } ups.Nodes = nodes + ups.Name = apisixv1.ComposeUpstreamName(namespace, name, subset, port.IntVal, resolveGranularity) + ups.ID = id.GenID(ups.Name) return ups, nil } -func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port int32, labels types.Labels) (apisixv1.UpstreamNodes, error) { +func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port intstr.IntOrString, labels types.Labels) (apisixv1.UpstreamNodes, error) { namespace, err := endpoint.Namespace() if err != nil { log.Errorw("failed to get endpoint namespace", @@ -185,18 +180,11 @@ func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port int32, label Reason: err.Error(), } } - - var svcPort *corev1.ServicePort - for _, exposePort := range svc.Spec.Ports { - if exposePort.Port == port { - svcPort = &exposePort - break - } - } - if svcPort == nil { + svcPort, err := t.parseServicePort(svc, port) + if err != nil { return nil, &TranslateError{ - Field: "service.spec.ports", - Reason: "port not defined", + Field: "service.Spec.Ports", + Reason: err.Error(), } } // As nodes is not optional, here we create an empty slice, @@ -217,6 +205,44 @@ func (t *translator) TranslateEndpoint(endpoint kube.Endpoint, port int32, label return nodes, nil } +func (t *translator) TranslateService(svc *corev1.Service, port intstr.IntOrString) (apisixv1.UpstreamNodes, error) { + if svc == nil { + return nil, errors.New("service should not be empty") + } + if svc.Spec.ClusterIP == "" { + return nil, errors.New("conflict headless service and backend resolve granularity") + } + svcPort, err := t.parseServicePort(svc, port) + if err != nil { + return nil, err + } + return apisixv1.UpstreamNodes{ + { + Host: svc.Spec.ClusterIP, + Port: int(svcPort.Port), + Weight: DefaultWeight, + }, + }, nil +} + +func (t *translator) TranslateUpstreamNodes(namespace, name, resolveGranularity string, port intstr.IntOrString, labels types.Labels) (apisixv1.UpstreamNodes, error) { + nodes := make(apisixv1.UpstreamNodes, 0) + switch resolveGranularity { + case "service": + svc, err := t.ServiceLister.Services(namespace).Get(name) + if err != nil { + return nil, err + } + return t.TranslateService(svc, port) + default: + ep, err := t.EndpointLister.GetEndpoint(namespace, name) + if err != nil { + return nodes, nil + } + return t.TranslateEndpoint(ep, port, labels) + } +} + func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels types.Labels, namespace string) apisixv1.UpstreamNodes { if labels == nil { return nodes @@ -246,3 +272,24 @@ func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels ty } return filteredNodes } + +func (t *translator) parseServicePort(svc *corev1.Service, port intstr.IntOrString) (*corev1.ServicePort, error) { + if svc == nil { + return nil, fmt.Errorf("service does not exist") + } + if port.Type == intstr.String { + for _, p := range svc.Spec.Ports { + if p.Name == port.StrVal { + return &p, nil + } + } + return nil, fmt.Errorf("service.Spec.Ports: port %s not found", port.StrVal) + } + for _, p := range svc.Spec.Ports { + if p.Port == port.IntVal { + return &p, nil + } + } + return nil, fmt.Errorf("service.Spec.Ports: port %d not found", port.IntVal) + +}