Skip to content

Commit

Permalink
Merge pull request #3436 from telepresenceio/thallgren/unify-by-targe…
Browse files Browse the repository at this point in the history
…tport

Fix problem with multiple service ports using the same target port
  • Loading branch information
thallgren authored Nov 24, 2023
2 parents cea3ac5 + ae1afe9 commit 81cb2b1
Show file tree
Hide file tree
Showing 23 changed files with 323 additions and 77 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ items:
- version: 2.17.1
date: (TBD)
notes:
- type: bugfix
title: Multiple services ports using the same target port would not get intercepted correctly.
body: Intercepts didn't work when multiple service ports were using the same container port. Telepresence would
think that one of the ports wasn't intercepted and therefore disable the intercept of the container port.
- type: bugfix
title: Root daemon refuses to disconnect.
body: The root daemon would sometimes hang forever when attempting to disconnect due to a deadlock in
Expand Down
4 changes: 1 addition & 3 deletions cmd/traffic/cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ func sidecar(ctx context.Context, s SimpleState, info *rpc.AgentInfo) error {
return fwd.Serve(tunnel.WithPool(ctx, tunnel.NewPool()), nil)
})
cnMountPoint := filepath.Join(agentconfig.ExportsMountPoint, filepath.Base(cn.MountPoint))
for _, ic := range ics {
s.AddInterceptState(s.NewInterceptState(fwd, ic, cnMountPoint, env))
}
s.AddInterceptState(s.NewInterceptState(fwd, NewInterceptTarget(ics), cnMountPoint, env))
}
}
TalkToManagerLoop(ctx, s, info)
Expand Down
8 changes: 8 additions & 0 deletions cmd/traffic/cmd/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
dns2 "github.com/miekg/dns"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
empty "google.golang.org/protobuf/types/known/emptypb"

"github.com/datawire/dlib/dgroup"
Expand Down Expand Up @@ -193,6 +195,12 @@ func handleInterceptLoop(ctx context.Context, snapshots <-chan *rpc.InterceptInf
for _, review := range reviews {
review.Session = session
if _, err := manager.ReviewIntercept(ctx, review); err != nil {
if status.Code(err) == codes.NotFound {
// An intercept may be removed after a snapshot has arrived and before
// the next snapshot. This is not an error. We can safely assume that
// a new snapshot will arrive.
continue
}
return err
}
}
Expand Down
7 changes: 3 additions & 4 deletions cmd/traffic/cmd/agent/fwdstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@ import (

"github.com/datawire/dlib/dlog"
"github.com/telepresenceio/telepresence/rpc/v2/manager"
"github.com/telepresenceio/telepresence/v2/pkg/agentconfig"
"github.com/telepresenceio/telepresence/v2/pkg/forwarder"
"github.com/telepresenceio/telepresence/v2/pkg/restapi"
"github.com/telepresenceio/telepresence/v2/pkg/tunnel"
)

type fwdState struct {
*simpleState
intercept *agentconfig.Intercept
intercept InterceptTarget
forwarder forwarder.Interceptor
mountPoint string
env map[string]string
}

// NewInterceptState creates a InterceptState that performs intercepts by using an Interceptor which indiscriminately
// intercepts all traffic to the port that it forwards.
func (s *simpleState) NewInterceptState(forwarder forwarder.Interceptor, intercept *agentconfig.Intercept, mountPoint string, env map[string]string) InterceptState {
func (s *simpleState) NewInterceptState(forwarder forwarder.Interceptor, intercept InterceptTarget, mountPoint string, env map[string]string) InterceptState {
return &fwdState{
simpleState: s,
mountPoint: mountPoint,
Expand All @@ -34,7 +33,7 @@ func (s *simpleState) NewInterceptState(forwarder forwarder.Interceptor, interce
}
}

func (fs *fwdState) InterceptConfig() *agentconfig.Intercept {
func (fs *fwdState) Target() InterceptTarget {
return fs.intercept
}

Expand Down
138 changes: 138 additions & 0 deletions cmd/traffic/cmd/agent/intercepttarget.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package agent

import (
"bytes"
"context"
"fmt"
"strconv"

v1 "k8s.io/api/core/v1"

"github.com/datawire/dlib/dlog"
"github.com/telepresenceio/telepresence/rpc/v2/manager"
"github.com/telepresenceio/telepresence/v2/pkg/agentconfig"
"github.com/telepresenceio/telepresence/v2/pkg/ioutil"
)

// InterceptTarget describes the mapping between service ports and one container port. All entries
// must be guaranteed to all have the same Protocol, ContainerPort, and AgentPort. The slice must
// be considered immutable once created using NewInterceptTarget.
type InterceptTarget []*agentconfig.Intercept

func NewInterceptTarget(ics []*agentconfig.Intercept) InterceptTarget {
// This is a parameter assertion. If it is triggered, then something is dead wrong in the caller code.
ni := len(ics)
if ni == 0 {
panic("attempt to add intercept create an InterceptTarget with no Intercepts")
}
if ni > 1 {
sv := ics[0]
for i := 1; i < ni; i++ {
ic := ics[i]
if sv.AgentPort != ic.AgentPort || sv.ContainerPort != ic.ContainerPort || sv.Protocol != ic.Protocol {
panic("attempt to add intercept to an InterceptTarget with different AgentPort or ContainerPort")
}
}
}
return ics
}

func (cp InterceptTarget) MatchForSpec(spec *manager.InterceptSpec) bool {
for _, sv := range cp {
if agentconfig.SpecMatchesIntercept(spec, sv) {
return true
}
}
return false
}

func (cp InterceptTarget) AgentPort() uint16 {
return cp[0].AgentPort
}

func (cp InterceptTarget) ContainerPort() uint16 {
return cp[0].ContainerPort
}

func (cp InterceptTarget) ContainerPortName() string {
return cp[0].ContainerPortName
}

func (cp InterceptTarget) Protocol() v1.Protocol {
return cp[0].Protocol
}

func (cp InterceptTarget) AppProtocol(ctx context.Context) (proto string) {
var foundSv *agentconfig.Intercept
for _, sv := range cp {
if sv.AppProtocol == "" {
continue
}
if foundSv == nil {
foundSv = sv
proto = foundSv.AppProtocol
} else if foundSv.AppProtocol != sv.AppProtocol {
svcPort := func(s *agentconfig.Intercept) string {
if s.ServicePortName != "" {
return fmt.Sprintf("%s:%s", s.ServiceName, s.ServicePortName)
}
return fmt.Sprintf("%s:%d", s.ServiceName, s.ServicePort)
}
dlog.Warningf(ctx, "port %s appProtocol %s differs from port %s appProtocol %s. %s will be used for container port %d",
svcPort(foundSv), proto,
svcPort(sv), sv.AppProtocol,
proto, sv.ContainerPort)
}
}
return proto
}

func (cp InterceptTarget) HasServicePortName(name string) bool {
for _, sv := range cp {
if sv.ServicePortName == name {
return true
}
}
return false
}

func (cp InterceptTarget) HasServicePort(port uint16) bool {
for _, sv := range cp {
if sv.ServicePort == port {
return true
}
}
return false
}

func (cp InterceptTarget) String() string {
sb := bytes.Buffer{}
l := len(cp)
if l > 1 {
sb.WriteByte('[')
}
for i, sv := range cp {
if i > 0 {
switch l {
case 2:
sb.WriteString(" and ")
case i + 1:
sb.WriteString(", and ")
default:
sb.WriteString(", ")
}
}
sb.WriteString(sv.ServiceName)
sb.WriteByte(':')
if sv.ServicePortName != "" {
sb.WriteString(sv.ServicePortName)
} else {
sb.WriteString(strconv.Itoa(int(sv.ServicePort)))
}
}
if l > 1 {
sb.WriteByte(']')
}
ioutil.Printf(&sb, " => container port %d/%s", cp.ContainerPort(), cp.Protocol())
return sb.String()
}
19 changes: 9 additions & 10 deletions cmd/traffic/cmd/agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/datawire/dlib/dlog"
"github.com/telepresenceio/telepresence/rpc/v2/agent"
"github.com/telepresenceio/telepresence/rpc/v2/manager"
"github.com/telepresenceio/telepresence/v2/pkg/agentconfig"
"github.com/telepresenceio/telepresence/v2/pkg/forwarder"
"github.com/telepresenceio/telepresence/v2/pkg/restapi"
"github.com/telepresenceio/telepresence/v2/pkg/tunnel"
Expand All @@ -37,13 +36,13 @@ type State interface {

type SimpleState interface {
State
NewInterceptState(forwarder forwarder.Interceptor, intercept *agentconfig.Intercept, mountPoint string, env map[string]string) InterceptState
NewInterceptState(forwarder forwarder.Interceptor, target InterceptTarget, mountPoint string, env map[string]string) InterceptState
}

// An InterceptState implements what's needed to intercept one port.
// An InterceptState implements what's needed to intercept one target port.
type InterceptState interface {
State
InterceptConfig() *agentconfig.Intercept
Target() InterceptTarget
InterceptInfo(ctx context.Context, callerID, path string, containerPort uint16, headers http.Header) (*restapi.InterceptInfo, error)
}

Expand Down Expand Up @@ -120,10 +119,10 @@ func (s *state) HandleIntercepts(ctx context.Context, iis []*manager.InterceptIn
for _, ist := range s.interceptStates {
ms := make([]*manager.InterceptInfo, 0, len(iis))
for _, ii := range iis {
ic := ist.InterceptConfig()
if agentconfig.SpecMatchesIntercept(ii.Spec, ic) {
dlog.Debugf(ctx, "intercept id %s svc=%q, svcPortId=%q matches config svc=%q, svcPort=%d, protocol=%s",
ii.Id, ii.Spec.ServiceName, ii.Spec.ServicePortIdentifier, ic.ServiceName, ic.ServicePort, ic.Protocol)
ic := ist.Target()
if ic.MatchForSpec(ii.Spec) {
dlog.Debugf(ctx, "intercept id %s svc=%q, svcPortId=%q matches target protocol=%s, agentPort=%d, containerPort=%d",
ii.Id, ii.Spec.ServiceName, ii.Spec.ServicePortIdentifier, ic.Protocol(), ic.AgentPort(), ic.ContainerPort())
ms = append(ms, ii)
}
}
Expand Down Expand Up @@ -153,8 +152,8 @@ func (s *simpleState) HandleIntercepts(ctx context.Context, iis []*manager.Inter

func (s *state) InterceptInfo(ctx context.Context, callerID, path string, containerPort uint16, headers http.Header) (*restapi.InterceptInfo, error) {
for _, is := range s.interceptStates {
ic := is.InterceptConfig()
if containerPort == ic.ContainerPort && ic.Protocol == core.ProtocolTCP {
ic := is.Target()
if containerPort == ic.ContainerPort() && ic.Protocol() == core.ProtocolTCP {
return is.InterceptInfo(ctx, callerID, path, containerPort, headers)
}
}
Expand Down
4 changes: 1 addition & 3 deletions cmd/traffic/cmd/agent/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ func makeFS(t *testing.T, ctx context.Context) (forwarder.Interceptor, agent.Sta
s := agent.NewSimpleState(c)
cn := c.AgentConfig().Containers[0]
cnMountPoint := filepath.Join(agentconfig.ExportsMountPoint, filepath.Base(cn.MountPoint))
for _, intercept := range cn.Intercepts {
s.AddInterceptState(s.NewInterceptState(f, intercept, cnMountPoint, map[string]string{}))
}
s.AddInterceptState(s.NewInterceptState(f, agent.NewInterceptTarget(cn.Intercepts), cnMountPoint, map[string]string{}))
return f, s
}

Expand Down
20 changes: 7 additions & 13 deletions cmd/traffic/cmd/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/datawire/dlib/dgroup"
"github.com/datawire/dlib/dhttp"
"github.com/datawire/dlib/dlog"
"github.com/datawire/k8sapi/pkg/k8sapi"
Expand Down Expand Up @@ -90,16 +89,10 @@ func MainWithEnv(ctx context.Context) error {
}
ctx = k8sapi.WithK8sInterface(ctx, ki)

mgr, ctx, err := NewServiceFunc(ctx)
mgr, g, err := NewServiceFunc(ctx)
if err != nil {
return fmt.Errorf("unable to initialize traffic manager: %w", err)
}
ctx, imgRetErr := WithAgentImageRetrieverFunc(ctx, mutator.RegenerateAgentMaps)

g := dgroup.NewGroup(ctx, dgroup.GroupConfig{
EnableSignalHandling: true,
SoftShutdownTimeout: 5 * time.Second,
})

g.Go("cli-config", mgr.runConfigWatcher)

Expand All @@ -108,11 +101,12 @@ func MainWithEnv(ctx context.Context) error {

g.Go("prometheus", mgr.servePrometheus)

if imgRetErr != nil {
dlog.Errorf(ctx, "unable to initialize agent injector: %v", imgRetErr)
} else {
g.Go("agent-injector", mutator.ServeMutator)
}
g.Go("agent-injector", func(ctx context.Context) error {
if managerutil.GetAgentImageRetriever(ctx) == nil {
return nil
}
return mutator.ServeMutator(ctx)
})

g.Go("session-gc", mgr.runSessionGCLoop)

Expand Down
9 changes: 9 additions & 0 deletions cmd/traffic/cmd/manager/managerutil/agentimage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package managerutil
import (
"context"
"fmt"
"runtime/debug"
"strings"

"github.com/datawire/dlib/dlog"
Expand Down Expand Up @@ -49,12 +50,20 @@ func WithResolvedAgentImageRetriever(ctx context.Context, ir ImageRetriever) con
return context.WithValue(ctx, irKey{}, ir)
}

func GetAgentImageRetriever(ctx context.Context) ImageRetriever {
if ir, ok := ctx.Value(irKey{}).(ImageRetriever); ok {
return ir
}
return nil
}

// GetAgentImage returns the fully qualified name of the traffic-agent image, i.e. "docker.io/tel2:2.7.4",
// or an empty string if no agent image has been configured.
func GetAgentImage(ctx context.Context) string {
if ir, ok := ctx.Value(irKey{}).(ImageRetriever); ok {
return ir.GetImage()
}
// The code isn't doing what it's supposed to do during startup.
dlog.Error(ctx, string(debug.Stack()))
panic("no ImageRetriever has been configured")
}
15 changes: 13 additions & 2 deletions cmd/traffic/cmd/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
empty "google.golang.org/protobuf/types/known/emptypb"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/datawire/dlib/dgroup"
"github.com/datawire/dlib/dlog"
"github.com/datawire/k8sapi/pkg/k8sapi"
rpc "github.com/telepresenceio/telepresence/rpc/v2/manager"
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/cluster"
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/config"
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/managerutil"
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/mutator"
"github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/state"
"github.com/telepresenceio/telepresence/v2/pkg/agentconfig"
"github.com/telepresenceio/telepresence/v2/pkg/dnsproxy"
Expand Down Expand Up @@ -76,18 +78,27 @@ func (wall) Now() time.Time {
return time.Now()
}

func NewService(ctx context.Context) (Service, context.Context, error) {
func NewService(ctx context.Context) (Service, *dgroup.Group, error) {
ret := &service{
clock: wall{},
id: uuid.New().String(),
}

ctx, err := WithAgentImageRetrieverFunc(ctx, mutator.RegenerateAgentMaps)
if err != nil {
dlog.Errorf(ctx, "unable to initialize agent injector: %v", err)
}
ret.configWatcher = config.NewWatcher(managerutil.GetEnv(ctx).ManagerNamespace)
ret.ctx = ctx
// These are context dependent so build them once the pool is up
ret.clusterInfo = cluster.NewInfo(ctx)
ret.state = state.NewStateFunc(ctx)
ret.self = ret
return ret, ctx, nil
g := dgroup.NewGroup(ctx, dgroup.GroupConfig{
EnableSignalHandling: true,
SoftShutdownTimeout: 5 * time.Second,
})
return ret, g, nil
}

func (s *service) SetSelf(self Service) {
Expand Down
Loading

0 comments on commit 81cb2b1

Please sign in to comment.