Skip to content

Commit

Permalink
Merge pull request #3273 from telepresenceio/knlambert/add-consumption
Browse files Browse the repository at this point in the history
Add consumption state to the traffic-manager
  • Loading branch information
Kévin LAMBERT authored Jul 31, 2023
2 parents 20d9880 + de55ed4 commit 6fd53bd
Show file tree
Hide file tree
Showing 20 changed files with 453 additions and 48 deletions.
3 changes: 2 additions & 1 deletion build-aux/main.mk
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ release-binary: $(TELEPRESENCE)
tel2-image: build-deps
mkdir -p $(BUILDDIR)
printf $(TELEPRESENCE_VERSION) > $(BUILDDIR)/version.txt ## Pass version in a file instead of a --build-arg to maximize cache usage
docker build --target tel2 --tag tel2 --tag $(TELEPRESENCE_REGISTRY)/tel2:$(patsubst v%,%,$(TELEPRESENCE_VERSION)) -f build-aux/docker/images/Dockerfile.traffic .
$(eval PLATFORM_ARG := $(if $(TELEPRESENCE_TEL2_IMAGE_PLATFORM), --platform=$(TELEPRESENCE_TEL2_IMAGE_PLATFORM),))
docker build $(PLATFORM_ARG) --target tel2 --tag tel2 --tag $(TELEPRESENCE_REGISTRY)/tel2:$(patsubst v%,%,$(TELEPRESENCE_VERSION)) -f build-aux/docker/images/Dockerfile.traffic .

.PHONY: client-image
client-image: build-deps
Expand Down
5 changes: 4 additions & 1 deletion cmd/traffic/cmd/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,13 @@ func (s *service) GetClientConfig(ctx context.Context, _ *empty.Empty) (*rpc.CLI
func (s *service) Remain(ctx context.Context, req *rpc.RemainRequest) (*empty.Empty, error) {
// ctx = WithSessionInfo(ctx, req.GetSession())
// dlog.Debug(ctx, "Remain called")
sessionID := req.GetSession().GetSessionId()
if ok := s.state.MarkSession(req, s.clock.Now()); !ok {
return nil, status.Errorf(codes.NotFound, "Session %q not found", req.GetSession().GetSessionId())
return nil, status.Errorf(codes.NotFound, "Session %q not found", sessionID)
}

s.state.RefreshSessionConsumptionMetrics(sessionID)

return &empty.Empty{}, nil
}

Expand Down
93 changes: 93 additions & 0 deletions cmd/traffic/cmd/manager/state/consumption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package state

import (
"context"
"time"

"github.com/telepresenceio/telepresence/v2/pkg/tunnel"
)

// SessionConsumptionMetricsStaleTTL is the duration after which we consider the metrics to be staled, meaning
// that they should not be updated anymore since the user doesn't really use Telepresence at the moment.
const SessionConsumptionMetricsStaleTTL = 60 * time.Minute

func NewSessionConsumptionMetrics() *SessionConsumptionMetrics {
return &SessionConsumptionMetrics{
ConnectDuration: 0,
FromClientBytes: tunnel.NewCounterProbe("FromClientBytes"),
ToClientBytes: tunnel.NewCounterProbe("ToClientBytes"),

LastUpdate: time.Now(),
}
}

type SessionConsumptionMetrics struct {
ConnectDuration uint32
LastUpdate time.Time

// data from client to the traffic manager.
FromClientBytes *tunnel.CounterProbe
// data from the traffic manager to the client.
ToClientBytes *tunnel.CounterProbe
}

func (s *SessionConsumptionMetrics) RunCollect(ctx context.Context) {
go s.FromClientBytes.RunCollect(ctx)
go s.ToClientBytes.RunCollect(ctx)
}

func (s *SessionConsumptionMetrics) Close() {
s.FromClientBytes.Close()
s.ToClientBytes.Close()
}

func (s *state) GetSessionConsumptionMetrics(sessionID string) *SessionConsumptionMetrics {
s.mu.RLock()
defer s.mu.RUnlock()
for i := range s.sessions {
if css, ok := s.sessions[i].(*clientSessionState); i == sessionID && ok {
return css.ConsumptionMetrics()
}
}
return nil
}

func (s *state) GetAllSessionConsumptionMetrics() map[string]*SessionConsumptionMetrics {
allSCM := make(map[string]*SessionConsumptionMetrics)
s.mu.RLock()
defer s.mu.RUnlock()
for sessionID := range s.sessions {
if css, ok := s.sessions[sessionID].(*clientSessionState); ok {
allSCM[sessionID] = css.ConsumptionMetrics()
}
}
return allSCM
}

// RefreshSessionConsumptionMetrics refreshes the metrics associated to a specific session.
func (s *state) RefreshSessionConsumptionMetrics(sessionID string) {
s.mu.Lock()
defer s.mu.Unlock()

session := s.sessions[sessionID]
if _, isClientSession := session.(*clientSessionState); !isClientSession {
return
}

lastMarked := session.LastMarked()
var scm *SessionConsumptionMetrics
if css, ok := s.sessions[sessionID].(*clientSessionState); ok {
scm = css.ConsumptionMetrics()
} else {
return
}

// If the last mark is older than the SessionConsumptionMetricsStaleTTL, it indicates that the duration
// metric should no longer be updated, as the user's machine may be in standby.
isStale := time.Now().After(lastMarked.Add(SessionConsumptionMetricsStaleTTL))
if !isStale {
scm.ConnectDuration += uint32(time.Since(scm.LastUpdate).Seconds())
}

scm.LastUpdate = time.Now()
}
41 changes: 41 additions & 0 deletions cmd/traffic/cmd/manager/state/consumption_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package state

import (
"time"

"github.com/stretchr/testify/assert"
)

func (s *suiteState) TestRefreshSessionConsumptionMetrics() {
// given
now := time.Now()
session1 := &clientSessionState{}
session1.SetLastMarked(now)
session3 := &clientSessionState{}
session3.SetLastMarked(now.Add(-24 * time.Hour * 30))
s.state.sessions["session-1"] = session1
s.state.sessions["session-2"] = &agentSessionState{}
s.state.sessions["session-3"] = session3
session1.consumptionMetrics = &SessionConsumptionMetrics{
ConnectDuration: 42,
LastUpdate: now.Add(-time.Minute),
}
// staled metric
session3.consumptionMetrics = &SessionConsumptionMetrics{
ConnectDuration: 36,
LastUpdate: session3.lastMarked,
}

// when
s.state.RefreshSessionConsumptionMetrics("session-1")
s.state.RefreshSessionConsumptionMetrics("session-2") // should not fail.
s.state.RefreshSessionConsumptionMetrics("session-3") // should not refresh a stale metric.

// then
ccs1 := s.state.sessions["session-1"].(*clientSessionState)
ccs3 := s.state.sessions["session-3"].(*clientSessionState)

assert.Len(s.T(), s.state.GetAllSessionConsumptionMetrics(), 2)
assert.True(s.T(), (ccs1.ConsumptionMetrics().ConnectDuration) > 42)
assert.Equal(s.T(), 36, int(ccs3.ConsumptionMetrics().ConnectDuration))
}
38 changes: 35 additions & 3 deletions cmd/traffic/cmd/manager/state/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ import (
"github.com/telepresenceio/telepresence/v2/pkg/tunnel"
)

const AgentSessionIDPrefix = "agent:"

type SessionState interface {
Cancel()
AwaitingBidiMapOwnerSessionID(stream tunnel.Stream) string
Done() <-chan struct{}
LastMarked() time.Time
SetLastMarked(lastMarked time.Time)
Dials() <-chan *rpc.DialRequest
EstablishBidiPipe(context.Context, tunnel.Stream) (tunnel.Endpoint, error)
OnConnect(context.Context, tunnel.Stream, *int32) (tunnel.Endpoint, error)
OnConnect(context.Context, tunnel.Stream, *int32, *SessionConsumptionMetrics) (tunnel.Endpoint, error)
}

type awaitingBidiPipe struct {
Expand Down Expand Up @@ -86,12 +89,27 @@ func (ss *sessionState) EstablishBidiPipe(ctx context.Context, stream tunnel.Str
}
}

func (ss *sessionState) AwaitingBidiMapOwnerSessionID(stream tunnel.Stream) string {
ss.Lock()
defer ss.Unlock()
if abp, ok := ss.awaitingBidiPipeMap[stream.ID()]; ok {
return abp.stream.SessionID()
}
return ""
}

// OnConnect checks if a stream is waiting for the given stream to arrive in order to create a BidiPipe.
// If that's the case, the BidiPipe is created, started, and returned by both this method and the EstablishBidiPipe
// method that registered the waiting stream. Otherwise, this method returns nil.
func (ss *sessionState) OnConnect(_ context.Context, stream tunnel.Stream, counter *int32) (tunnel.Endpoint, error) {
func (ss *sessionState) OnConnect(
ctx context.Context,
stream tunnel.Stream,
counter *int32,
consumptionMetrics *SessionConsumptionMetrics,
) (tunnel.Endpoint, error) {
id := stream.ID()
ss.Lock()
// abp is a session corresponding to an end user machine
abp, ok := ss.awaitingBidiPipeMap[id]
if ok {
delete(ss.awaitingBidiPipeMap, id)
Expand All @@ -102,7 +120,13 @@ func (ss *sessionState) OnConnect(_ context.Context, stream tunnel.Stream, count
return nil, nil
}
name := fmt.Sprintf("%s: session %s -> %s", id, abp.stream.SessionID(), stream.SessionID())
bidiPipe := tunnel.NewBidiPipe(abp.stream, stream, name, counter)
tunnelProbes := &tunnel.BidiPipeProbes{}
if consumptionMetrics != nil {
tunnelProbes.BytesProbeA = consumptionMetrics.FromClientBytes
tunnelProbes.BytesProbeB = consumptionMetrics.ToClientBytes
}

bidiPipe := tunnel.NewBidiPipe(abp.stream, stream, name, counter, tunnelProbes)
bidiPipe.Start(abp.ctx)

defer close(abp.bidiPipeCh)
Expand Down Expand Up @@ -148,12 +172,20 @@ func newSessionState(ctx context.Context, now time.Time) sessionState {
type clientSessionState struct {
sessionState
pool *tunnel.Pool

consumptionMetrics *SessionConsumptionMetrics
}

func (css *clientSessionState) ConsumptionMetrics() *SessionConsumptionMetrics {
return css.consumptionMetrics
}

func newClientSessionState(ctx context.Context, ts time.Time) *clientSessionState {
return &clientSessionState{
sessionState: newSessionState(ctx, ts),
pool: tunnel.NewPool(),

consumptionMetrics: NewSessionConsumptionMetrics(),
}
}

Expand Down
50 changes: 46 additions & 4 deletions cmd/traffic/cmd/manager/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type State interface {
GetAgent(string) *rpc.AgentInfo
GetAllClients() map[string]*rpc.ClientInfo
GetClient(string) *rpc.ClientInfo
GetSession(string) SessionState
GetSessionConsumptionMetrics(string) *SessionConsumptionMetrics
GetAllSessionConsumptionMetrics() map[string]*SessionConsumptionMetrics
GetIntercept(string) (*rpc.InterceptInfo, bool)
MarkSession(*rpc.RemainRequest, time.Time) bool
NewInterceptInfo(string, *rpc.SessionInfo, *rpc.CreateInterceptRequest) *rpc.InterceptInfo
Expand All @@ -54,6 +57,7 @@ type State interface {
Tunnel(context.Context, tunnel.Stream) error
UpdateIntercept(string, func(*rpc.InterceptInfo)) *rpc.InterceptInfo
UpdateClient(sessionID string, apply func(*rpc.ClientInfo)) *rpc.ClientInfo
RefreshSessionConsumptionMetrics(sessionID string)
ValidateAgentImage(string, bool) error
WaitForTempLogLevel(rpc.Manager_WatchLogLevelServer) error
WatchAgents(context.Context, func(sessionID string, agent *rpc.AgentInfo) bool) <-chan watchable.Snapshot[*rpc.AgentInfo]
Expand Down Expand Up @@ -200,6 +204,12 @@ func (s *state) MarkSession(req *rpc.RemainRequest, now time.Time) (ok bool) {
return false
}

func (s *state) GetSession(sessionID string) SessionState {
s.mu.RLock()
defer s.mu.RUnlock()
return s.sessions[sessionID]
}

// RemoveSession removes a session from the set of present session IDs.
func (s *state) RemoveSession(ctx context.Context, sessionID string) {
s.mu.Lock()
Expand Down Expand Up @@ -257,6 +267,10 @@ func (s *state) unlockedRemoveSession(sessionID string) {
s.clients.Delete(sessionID)
}

if css, ok := sess.(*clientSessionState); ok {
defer css.ConsumptionMetrics().Close()
}

delete(s.sessions, sessionID)
}
}
Expand Down Expand Up @@ -314,7 +328,13 @@ func (s *state) addClient(sessionID string, client *rpc.ClientInfo, now time.Tim
if oldClient, hasConflict := s.clients.LoadOrStore(sessionID, client); hasConflict {
panic(fmt.Errorf("duplicate id %q, existing %+v, new %+v", sessionID, oldClient, client))
}

s.sessions[sessionID] = newClientSessionState(s.ctx, now)

if css, ok := s.sessions[sessionID].(*clientSessionState); ok {
css.ConsumptionMetrics().RunCollect(s.ctx)
}

return sessionID
}

Expand Down Expand Up @@ -356,7 +376,7 @@ func (s *state) AddAgent(agent *rpc.AgentInfo, now time.Time) string {
s.mu.Lock()
defer s.mu.Unlock()

sessionID := "agent:" + uuid.New().String()
sessionID := AgentSessionIDPrefix + uuid.New().String()
if oldAgent, hasConflict := s.agents.LoadOrStore(sessionID, agent); hasConflict {
panic(fmt.Errorf("duplicate id %q, existing %+v, new %+v", sessionID, oldAgent, agent))
}
Expand Down Expand Up @@ -604,7 +624,26 @@ func (s *state) Tunnel(ctx context.Context, stream tunnel.Stream) error {
return status.Errorf(codes.NotFound, "Session %q not found", sessionID)
}

bidiPipe, err := ss.OnConnect(ctx, stream, &s.tunnelCounter)
var scm *SessionConsumptionMetrics
switch sst := ss.(type) {
case *agentSessionState:
// If it's an agent, find the associated clientSessionState.
if clientSessionID := sst.AwaitingBidiMapOwnerSessionID(stream); clientSessionID != "" {
s.mu.RLock()
as := s.sessions[clientSessionID] // get awaiting state
s.mu.RUnlock()
if as != nil { // if found
if css, isClient := as.(*clientSessionState); isClient {
scm = css.ConsumptionMetrics()
}
}
}
case *clientSessionState:
scm = sst.ConsumptionMetrics()
default:
}

bidiPipe, err := ss.OnConnect(ctx, stream, &s.tunnelCounter, scm)
if err != nil {
return err
}
Expand All @@ -619,7 +658,7 @@ func (s *state) Tunnel(ctx context.Context, stream tunnel.Stream) error {
// The session is either the telepresence client or a traffic-agent.
//
// A client will want to extend the tunnel to a dialer in an intercepted traffic-agent or, if no
// intercept is active, to a dialer here in the traffic-agent.
// intercept is active, to a dialer here in the traffic-manager.
//
// A traffic-agent must always extend the tunnel to the client that it is currently intercepted
// by, and hence, start by sending the sessionID of that client on the tunnel.
Expand Down Expand Up @@ -651,7 +690,10 @@ func (s *state) Tunnel(ctx context.Context, stream tunnel.Stream) error {
return err
}
} else {
endPoint = tunnel.NewDialer(stream, func() {})
if css, isClient := ss.(*clientSessionState); isClient {
scm = css.ConsumptionMetrics()
}
endPoint = tunnel.NewDialer(stream, func() {}, scm.FromClientBytes, scm.ToClientBytes)
endPoint.Start(ctx)
}
<-endPoint.Done()
Expand Down
Loading

0 comments on commit 6fd53bd

Please sign in to comment.