Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-9756-draft #4737

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions components/camera/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewClientFromConn(
streamClient := streampb.NewStreamServiceClient(conn)
trackClosed := make(chan struct{})
close(trackClosed)
return &client{
cc := &client{
remoteName: remoteName,
Named: name.PrependRemote(remoteName).AsNamed(),
name: name.ShortName(),
Expand All @@ -95,7 +95,9 @@ func NewClientFromConn(
trackClosed: trackClosed,
associatedSubs: map[int][]rtppassthrough.SubscriptionID{},
logger: logger,
}, nil
}
logger.Infof("%p cameraClient %#v", c, c)
return cc, nil
}

func (c *client) Stream(
Expand Down Expand Up @@ -369,6 +371,7 @@ func (c *client) SubscribeRTP(
bufferSize int,
packetsCB rtppassthrough.PacketCallback,
) (rtppassthrough.Subscription, error) {
c.logger.Infof("%p SubscribeRTP called on camera client: %#v", c, c)
ctx, span := trace.StartSpan(ctx, "camera::client::SubscribeRTP")
defer span.End()
// RSDK-6340: The resource manager closes remote resources when the underlying
Expand All @@ -395,16 +398,20 @@ func (c *client) SubscribeRTP(
// with those from the new "generation".
healthyClientCh := c.maybeResetHealthyClientCh()

c.logger.Infof("%p SubscribeRTP calling rtpPassthroughMu", c)
defer c.logger.Infof("%p SubscribeRTP released rtpPassthroughMu", c)
c.rtpPassthroughMu.Lock()
defer c.rtpPassthroughMu.Unlock()
c.logger.Infof("%p SubscribeRTP has rtpPassthroughMu", c)

// Create a Subscription object and allocate a ring buffer of RTP packets.
sub, rtpPacketBuffer, err := rtppassthrough.NewSubscription(bufferSize)
if err != nil {
return sub, err
}
c.logger.Infof("%p SubscribeRTP got subscription", c)
g := utils.NewGuard(func() {
c.logger.CDebug(ctx, "Error subscribing to RTP. Closing passthrough buffer.")
c.logger.CDebugf(ctx, "%p Error subscribing to RTP. Closing passthrough buffer.", c)
rtpPacketBuffer.Close()
})
defer g.OnFail()
Expand All @@ -425,14 +432,13 @@ func (c *client) SubscribeRTP(
// for which video stream. The `grpc.Tracker` API is for manipulating that map.
tracker, ok := c.conn.(grpc.Tracker)
if !ok {
c.logger.CErrorw(ctx, "Client conn is not a `Tracker`", "connType", fmt.Sprintf("%T", c.conn))
c.logger.CErrorf(ctx, "%p Client conn is not a `Tracker` connType %T", c, c.conn)
return rtppassthrough.NilSubscription, ErrNoSharedPeerConnection
}

c.logger.CDebugw(ctx, "SubscribeRTP", "subID", sub.ID.String(), "name", c.Name(), "bufAndCBByID", c.bufAndCBToString())
c.logger.CDebugf(ctx, "%p SubscribeRTP subID: %s, name: %s, bufAndCBByID: %s", c, sub.ID.String(), c.Name(), c.bufAndCBToString())
defer func() {
c.logger.CDebugw(ctx, "SubscribeRTP after", "subID", sub.ID.String(),
"name", c.Name(), "bufAndCBByID", c.bufAndCBToString())
c.logger.CDebugf(ctx, "%p SubscribeRTP after subID: %s, name: %s, bufAndCBByID: %s", c, sub.ID.String(), c.Name(), c.bufAndCBToString())
}()

// If we do not currently have a video stream open for this camera, we create one. Otherwise
Expand All @@ -443,13 +449,13 @@ func (c *client) SubscribeRTP(
// the same camera when the remote receives `RemoveStream`, it wouldn't know which to stop
// sending data for.
if len(c.runningStreams) == 0 {
c.logger.CInfow(ctx, "SubscribeRTP is creating a video track",
"client", fmt.Sprintf("%p", c), "peerConnection", fmt.Sprintf("%p", c.conn.PeerConn()))
c.logger.CInfof(ctx, "%p SubscribeRTP is creating a video track, peerConnection: %p", c, c.conn.PeerConn())
// A previous subscriber/track may have exited, but its resources have not necessarily been
// cleaned up. We must wait for that to complete. We additionally select on other error
// conditions.
select {
case <-c.trackClosed:
c.logger.Infof("%p SubscribeRTP track closed", c)
case <-ctx.Done():
return rtppassthrough.NilSubscription, fmt.Errorf("track not closed within SubscribeRTP provided context %w", ctx.Err())
case <-healthyClientCh:
Expand All @@ -467,32 +473,38 @@ func (c *client) SubscribeRTP(
c.associatedSubs[c.subGenerationID] = []rtppassthrough.SubscriptionID{}

// Add the camera's addOnTrackSubFunc to the SharedConn's map of OnTrack callbacks.
c.logger.Infof("%p SubscribeRTP calling AddOnTrackSub on %p, %#v", c, tracker, tracker)
tracker.AddOnTrackSub(c.trackName(), c.addOnTrackFunc(healthyClientCh, trackReceived, trackClosed, c.subGenerationID))
// Remove the OnTrackSub once we either fail or succeed.
defer tracker.RemoveOnTrackSub(c.trackName())

// Call `AddStream` on the remote. In the successful case, this will result in a
// PeerConnection renegotiation to add this camera's video track and have the `OnTrack`
// callback invoked.
if _, err := c.streamClient.AddStream(ctx, &streampb.AddStreamRequest{Name: c.trackName()}); err != nil {
c.logger.CDebugw(ctx, "SubscribeRTP AddStream hit error", "subID", sub.ID.String(), "trackName", c.trackName(), "err", err)
c.logger.Infof("%p SubscribeRTP calling AddStream on %s", c, c.trackName())

if _, err := c.streamClient.AddStream(
ctx,
&streampb.AddStreamRequest{Name: c.trackName()},
); err != nil {
c.logger.CDebugf(ctx, "%p SubscribeRTP AddStream hit error subID: %s, trackName: %s, err: %s", c, sub.ID.String(), c.trackName(), err.Error())
return rtppassthrough.NilSubscription, err
}

c.logger.CDebugw(ctx, "SubscribeRTP waiting for track", "client", fmt.Sprintf("%p", c), "pc", fmt.Sprintf("%p", c.conn.PeerConn()))
c.logger.CDebugf(ctx, "%p SubscribeRTP waiting for track pc: %p", c, c.conn.PeerConn())
select {
case <-ctx.Done():
return rtppassthrough.NilSubscription, fmt.Errorf("track not received within SubscribeRTP provided context %w", ctx.Err())
case <-healthyClientCh:
return rtppassthrough.NilSubscription, errors.New("track not received before client closed")
case <-trackReceived:
c.logger.CDebugw(ctx, "SubscribeRTP received track data", "client", fmt.Sprintf("%p", c), "pc", fmt.Sprintf("%p", c.conn.PeerConn()))
c.logger.CDebugf(ctx, "%p SubscribeRTP received track data pc: %p", c, c.conn.PeerConn())
}

// Set up channel to detect when the track has closed. This can happen in response to an
// event / error internal to the peer or due to calling `RemoveStream`.
c.trackClosed = trackClosed
c.logger.CDebugw(ctx, "SubscribeRTP called AddStream and succeeded", "subID", sub.ID.String())
c.logger.CDebugf(ctx, "%p SubscribeRTP called AddStream and succeeded subID: %s", c, sub.ID.String())
}

// Associate this subscription with the current generation.
Expand All @@ -508,17 +520,18 @@ func (c *client) SubscribeRTP(
// Start the goroutine that copies RTP packets
rtpPacketBuffer.Start()
g.Success()
c.logger.CDebugw(ctx, "SubscribeRTP succeeded", "subID", sub.ID.String(),
"name", c.Name(), "bufAndCBByID", c.bufAndCBToString())
c.logger.CDebugf(ctx, "%p SubscribeRTP succeeded, subID: %s, name: %s, bufAndCBByID: %s", c, sub.ID.String(), c.Name(), c.bufAndCBToString())
return sub, nil
}

func (c *client) addOnTrackFunc(
healthyClientCh, trackReceived, trackClosed chan struct{},
generationID int,
) grpc.OnTrackCB {
c.logger.Infof("%p %s addOnTrackFunc called, pc: %p", c, c.Name(), c.conn.PeerConn())
// This is invoked when `PeerConnection.OnTrack` is called for this camera's stream id.
return func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver) {
c.logger.Infof("%p %s addOnTrackFunc callback called, pc: %p, tr: %p, r: %p", c, c.Name(), c.conn.PeerConn(), tr, r)
// Our `OnTrack` was called. Inform `SubscribeRTP` that getting video data was successful.
close(trackReceived)
c.activeBackgroundWorkers.Add(1)
Expand All @@ -545,7 +558,7 @@ func (c *client) addOnTrackFunc(
pkt, _, err := tr.ReadRTP()
if os.IsTimeout(err) {
c.logger.Debugw("ReadRTP read timeout", "generationId", generationID,
"err", err, "timeout", readRTPTimeout.String())
"err", err, "timeout", readRTPTimeout.String(), "pc", fmt.Sprintf("%p", c.conn.PeerConn()))
continue
}

Expand All @@ -571,7 +584,10 @@ func (c *client) addOnTrackFunc(
// This is needed to prevent the problem described here:
// https://go.dev/blog/loopvar-preview
bufAndCB := tmp
err := bufAndCB.buf.Publish(func() { bufAndCB.cb(pkt) })
err := bufAndCB.buf.Publish(func() {
c.logger.Infof("%p %s received packet from pc %p", c, c.Name(), c.conn.PeerConn())
bufAndCB.cb(pkt)
})
if err != nil {
c.logger.Debugw("rtp passthrough packet dropped",
"generationId", generationID, "err", err)
Expand Down Expand Up @@ -629,7 +645,11 @@ func (c *client) Unsubscribe(ctx context.Context, id rtppassthrough.Subscription
request := &streampb.RemoveStreamRequest{Name: c.trackName()}
// We assume the server responds with a success if the requested `Name` is unknown/already
// removed.
if _, err := c.streamClient.RemoveStream(ctx, request); err != nil {

if _, err := c.streamClient.RemoveStream(
ctx,
request,
); err != nil {
c.logger.CWarnw(ctx, "Unsubscribe RemoveStream returned err", "trackName",
c.trackName(), "subID", id.String(), "err", err)
c.rtpPassthroughMu.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,5 @@ require (
github.com/ziutek/mymysql v1.5.4 // indirect
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
)

replace go.viam.com/utils => /Users/nicksanford/code/goutils
1 change: 1 addition & 0 deletions gostream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (bs *basicStream) Start() {
// if we also need to support writing audio RTP packets, we should split
// this method into WriteVideoRTP and WriteAudioRTP.
func (bs *basicStream) WriteRTP(pkt *rtp.Packet) error {
bs.logger.Info("basic stream calling WriteRTP")
return bs.videoTrackLocal.rtpTrack.WriteRTP(pkt)
}

Expand Down
5 changes: 4 additions & 1 deletion grpc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ func (c *ReconfigurableClientConn) ReplaceConn(conn rpc.ClientConn) {
if c.onTrackCBByTrackName == nil {
c.onTrackCBByTrackName = make(map[string]OnTrackCB)
}

logging.Global().Infof("ReconfigurableClientConn.ReplaceConn START: %p, %#v, pc: %p, %#v", c, c, conn.PeerConn(), conn.PeerConn())
defer logging.Global().Infof("ReconfigurableClientConn.ReplaceConn END: %p, %#v, pc: %p, %#v", c, c, conn.PeerConn(), conn.PeerConn())
if pc := conn.PeerConn(); pc != nil {
logging.Global().Infof("ReconfigurableClientConn.ReplaceConn installing OnTrack on %p, %#v, pc: %p, %#v", c, c, conn.PeerConn(), conn.PeerConn())
pc.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
logging.Global().Infof("OnTrack called on pc: %p, %#v, tr: %p %#v, receiver: %p, %#v", pc, pc, trackRemote, trackRemote, rtpReceiver, rtpReceiver)
c.onTrackCBByTrackNameMu.Lock()
onTrackCB, ok := c.onTrackCBByTrackName[trackRemote.StreamID()]
c.onTrackCBByTrackNameMu.Unlock()
Expand Down
61 changes: 56 additions & 5 deletions grpc/shared_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
googlegrpc "google.golang.org/grpc"

"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
rutils "go.viam.com/rdk/utils"
)

Expand Down Expand Up @@ -71,10 +72,9 @@ type SharedConn struct {
// `peerConnMu` synchronizes changes to the underlying `peerConn`. Such that calls consecutive
// calls to `GrpcConn` and `PeerConn` will return connections from the same (or newer, but not
// prior) "generations".
peerConnMu sync.Mutex
peerConn *webrtc.PeerConnection
peerConnReady <-chan struct{}
peerConnClosed <-chan struct{}
peerConnMu sync.Mutex
peerConn *webrtc.PeerConnection
peerConnReady <-chan struct{}
// peerConnFailed gets closed when a PeerConnection fails to connect. The peerConn pointer is
// set to nil before this channel is closed.
peerConnFailed chan struct{}
Expand All @@ -85,6 +85,49 @@ type SharedConn struct {
logger logging.Logger
}

func NewSharedConn(grpcConn rpc.ClientConn, peerConn *webrtc.PeerConnection, logger logging.Logger) *SharedConn {
// We must be passed a ready connection.
pcReady := make(chan struct{})
close(pcReady)

ret := &SharedConn{
peerConn: peerConn,
peerConnReady: pcReady,
// We were passed in a ready connection. Only create this for when `Close` is called.
peerConnFailed: make(chan struct{}),
onTrackCBByTrackName: make(map[string]OnTrackCB),
logger: logger,
}
ret.grpcConn.ReplaceConn(grpcConn)
ret.logger.Infof("OnTrack installed on %p", peerConn)
ret.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
ret.logger.Info("OnTrack called:", trackRemote.StreamID())
ret.onTrackCBByTrackNameMu.Lock()
onTrackCB, ok := ret.onTrackCBByTrackName[trackRemote.StreamID()]
if !ok {
// HACK please forgive
for name, cb := range ret.onTrackCBByTrackName {
if n, err := resource.NewFromString(name); err == nil {
if n.SDPTrackName() == trackRemote.StreamID() {
onTrackCB = cb
ok = true
break
}
}
}
}
ret.onTrackCBByTrackNameMu.Unlock()
if !ok {
msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v"
ret.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(ret.onTrackCBByTrackName))
return
}
onTrackCB(trackRemote, rtpReceiver)
})

return ret
}

// Invoke forwards to the underlying GRPC Connection.
func (sc *SharedConn) Invoke(
ctx context.Context,
Expand All @@ -107,8 +150,12 @@ func (sc *SharedConn) NewStream(

// AddOnTrackSub adds an OnTrack subscription for the track.
func (sc *SharedConn) AddOnTrackSub(trackName string, onTrackCB OnTrackCB) {
sc.logger.Infof("sharedConn: %p, pc: %p AddOnTrackSub called on %s", sc, sc.peerConn, trackName)
sc.onTrackCBByTrackNameMu.Lock()
defer sc.onTrackCBByTrackNameMu.Unlock()
if _, ok := sc.onTrackCBByTrackName[trackName]; ok {
sc.logger.Infof("track callback replaced %s", trackName)
}
sc.onTrackCBByTrackName[trackName] = onTrackCB
}

Expand Down Expand Up @@ -150,6 +197,7 @@ func (sc *SharedConn) PeerConn() *webrtc.PeerConnection {
// happens. But subequent calls can be entirely asynchronous to components/services accessing
// `SharedConn` for connection objects.
func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger) {
moduleLogger.Infof("ResetConn called on %p, sharedConn: %p", conn, sc)
sc.grpcConn.ReplaceConn(conn)
if sc.logger == nil {
// The first call to `ResetConn` happens before anything can access `sc.logger`. So long as
Expand Down Expand Up @@ -192,14 +240,17 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
return
}

sc.logger.Infof("shared conn: %p, peerConnection: %p", sc, peerConn)
sc.peerConn = peerConn
sc.peerConnReady, sc.peerConnClosed, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleClient, sc.logger)
sc.peerConnReady, _, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleClient, sc.logger)
if err != nil {
sc.logger.Warnw("Unable to create optional renegotiation channel for module. Ignoring.", "err", err)
return
}

sc.logger.Warnf("Setting OnTrack on pc: %p", sc.peerConn)
sc.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
sc.logger.Warnf("OnTrack called on pc: %p, %#v, trackRemote: %p, %#v, rtpReceiver: %p, %#v", sc.peerConn, sc.peerConn, trackRemote, trackRemote, rtpReceiver, rtpReceiver)
sc.onTrackCBByTrackNameMu.Lock()
onTrackCB, ok := sc.onTrackCBByTrackName[trackRemote.StreamID()]
sc.onTrackCBByTrackNameMu.Unlock()
Expand Down
11 changes: 11 additions & 0 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.viam.com/rdk/operation"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot/packages"
"go.viam.com/rdk/robot/web"
rutils "go.viam.com/rdk/utils"
)

Expand All @@ -59,9 +60,11 @@ var (
// NewManager returns a Manager.
func NewManager(
ctx context.Context, parentAddr string, logger logging.Logger, options modmanageroptions.Options,
m web.ModularResourceToPeerConnectionMapper,
) modmaninterface.ModuleManager {
restartCtx, restartCtxCancel := context.WithCancel(ctx)
ret := &Manager{
m: m,
logger: logger.Sublogger("modmanager"),
modules: moduleMap{},
parentAddr: parentAddr,
Expand Down Expand Up @@ -165,6 +168,7 @@ func (rmap *resourceModuleMap) Range(f func(name resource.Name, mod *module) boo

// Manager is the root structure for the module system.
type Manager struct {
m web.ModularResourceToPeerConnectionMapper
// mu (mostly) coordinates API methods that modify the `modules` map. Specifically,
// `AddResource` can be called concurrently during a reconfigure. But `RemoveResource` or
// resources being restarted after a module crash are given exclusive access.
Expand Down Expand Up @@ -522,6 +526,7 @@ func (mgr *Manager) closeModule(mod *module, reconfigure bool) error {
mgr.rMap.Range(func(r resource.Name, m *module) bool {
if m == mod {
mgr.rMap.Delete(r)
mgr.m.ClearResourceNameToModulePeerConnection(r.String())
}
return true
})
Expand Down Expand Up @@ -562,6 +567,9 @@ func (mgr *Manager) addResource(ctx context.Context, conf resource.Config, deps
return nil, err
}
mgr.rMap.Store(conf.ResourceName(), mod)
if pc := mod.sharedConn.PeerConn(); pc != nil {
mgr.m.SetResourceNameToModulePeerConnection(conf.ResourceName().String(), pc)
}

mod.resourcesMu.Lock()
defer mod.resourcesMu.Unlock()
Expand Down Expand Up @@ -636,6 +644,7 @@ func (mgr *Manager) RemoveResource(ctx context.Context, name resource.Name) erro
mod.logger.CInfow(ctx, "Removing resource for module", "resource", name.String(), "module", mod.cfg.Name)

mgr.rMap.Delete(name)
mgr.m.ClearResourceNameToModulePeerConnection(name.String())
delete(mod.resources, name)
_, err := mod.client.RemoveResource(ctx, &pb.RemoveResourceRequest{Name: name.String()})
if err != nil {
Expand Down Expand Up @@ -916,6 +925,7 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b
mod.logger.Warnw("Error while re-adding resource to module",
"resource", name, "module", mod.cfg.Name, "error", err)
mgr.rMap.Delete(name)
mgr.m.ClearResourceNameToModulePeerConnection(name.String())

mod.resourcesMu.Lock()
delete(mod.resources, name)
Expand Down Expand Up @@ -1366,6 +1376,7 @@ func (m *module) cleanupAfterCrash(mgr *Manager) {
mgr.rMap.Range(func(r resource.Name, mod *module) bool {
if mod == m {
mgr.rMap.Delete(r)
mgr.m.ClearResourceNameToModulePeerConnection(r.String())
}
return true
})
Expand Down
Loading
Loading