Skip to content

Commit

Permalink
gh-14 Refine pipeline metrics interface
Browse files Browse the repository at this point in the history
  • Loading branch information
gavv committed Feb 5, 2024
1 parent b771eb7 commit 8b27edc
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 63 deletions.
23 changes: 16 additions & 7 deletions src/internal_modules/roc_pipeline/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "roc_audio/latency_tuner.h"
#include "roc_core/stddefs.h"
#include "roc_packet/ilink_meter.h"
#include "roc_packet/units.h"

namespace roc {
namespace pipeline {
Expand All @@ -33,15 +34,19 @@ struct SenderParticipantMetrics {

//! Sender-side metrics of the whole slot.
struct SenderSlotMetrics {
//! Is slot configuration complete.
bool is_complete;
//! Slot source ID.
packet::stream_source_t source_id;

//! Number of receivers connected to sender slot.
//! Number of participants (remote receivers) connected to slot.
size_t num_participants;

//! Is slot configuration complete (all endpoints bound).
bool is_complete;

SenderSlotMetrics()
: is_complete(false)
, num_participants(0) {
: source_id(0)
, num_participants(0)
, is_complete(false) {
}
};

Expand All @@ -59,11 +64,15 @@ struct ReceiverParticipantMetrics {

//! Receiver-side metrics of the whole slot.
struct ReceiverSlotMetrics {
//! Number of senders connected to receiver slot.
//! Slot source ID.
packet::stream_source_t source_id;

//! Number of participants (remote senders) connected to slot.
size_t num_participants;

ReceiverSlotMetrics()
: num_participants(0) {
: source_id(0)
, num_participants(0) {
}
};

Expand Down
32 changes: 19 additions & 13 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,30 +141,36 @@ void ReceiverSessionGroup::reclock_sessions(core::nanoseconds_t playback_time) {
}
}

size_t ReceiverSessionGroup::num_participants() const {
size_t ReceiverSessionGroup::num_sessions() const {
roc_panic_if(!is_valid());

return sessions_.size();
}

void ReceiverSessionGroup::get_participant_metrics(
ReceiverParticipantMetrics* party_metrics, size_t* party_count) const {
void ReceiverSessionGroup::get_slot_metrics(ReceiverSlotMetrics& slot_metrics) const {
roc_panic_if(!is_valid());

roc_panic_if_not(party_metrics);
roc_panic_if_not(party_count);
slot_metrics.source_id = identity_->ssrc();
slot_metrics.num_participants = sessions_.size();
}

*party_count = std::min(*party_count, sessions_.size());
void ReceiverSessionGroup::get_participant_metrics(
ReceiverParticipantMetrics* party_metrics, size_t* party_count) const {
roc_panic_if(!is_valid());

size_t n = 0;
if (party_metrics && party_count) {
*party_count = std::min(*party_count, sessions_.size());

for (core::SharedPtr<ReceiverSession> sess = sessions_.front(); sess;
sess = sessions_.nextof(*sess)) {
if (n == *party_count) {
break;
size_t n_part = 0;
for (core::SharedPtr<ReceiverSession> sess = sessions_.front(); sess;
sess = sessions_.nextof(*sess)) {
if (n_part == *party_count) {
break;
}
party_metrics[n_part++] = sess->get_metrics();
}
party_metrics[n] = sess->get_metrics();
n++;
} else if (party_count) {
*party_count = 0;
}
}

Expand Down
17 changes: 12 additions & 5 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,21 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
//! retrieved from pipeline will be actually played on sink
void reclock_sessions(core::nanoseconds_t playback_time);

//! Get number of remote participants.
//! On receiver, one participant corresponds to one ReceiverSession inside
//! ReceiverSessionGroup, because we create a separate session for every
//! connected participant (remote sender).
size_t num_participants() const;
//! Get number of sessions in group.
size_t num_sessions() const;

//! Get slot metrics.
//! @remarks
//! These metrics are for the whole slot.
//! For metrics for specific participant, see get_participant_metrics().
void get_slot_metrics(ReceiverSlotMetrics& slot_metrics) const;

//! Get metrics for remote participants.
//! @remarks
//! On receiver, one participant corresponds to one ReceiverSession inside
//! ReceiverSessionGroup, because we create a separate session for every
//! connected participant (remote sender).
//! @note
//! @p party_metrics points to array of metrics structs, and @p party_count
//! defines number of array elements. Metrics are written to given array,
//! and @p party_count is updated of actual number of elements written.
Expand Down
9 changes: 3 additions & 6 deletions src/internal_modules/roc_pipeline/receiver_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,18 @@ void ReceiverSlot::reclock(core::nanoseconds_t playback_time) {
size_t ReceiverSlot::num_sessions() const {
roc_panic_if(!is_valid());

return session_group_.num_participants();
return session_group_.num_sessions();
}

void ReceiverSlot::get_metrics(ReceiverSlotMetrics& slot_metrics,
ReceiverParticipantMetrics* party_metrics,
size_t* party_count) const {
roc_panic_if(!is_valid());

slot_metrics = ReceiverSlotMetrics();
slot_metrics.num_participants = session_group_.num_participants();
session_group_.get_slot_metrics(slot_metrics);

if (party_metrics && party_count) {
if (party_metrics || party_count) {
session_group_.get_participant_metrics(party_metrics, party_count);
} else if (party_count) {
*party_count = 0;
}
}

Expand Down
21 changes: 9 additions & 12 deletions src/internal_modules/roc_pipeline/sender_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,31 +263,28 @@ core::nanoseconds_t SenderSession::refresh(core::nanoseconds_t current_time) {
return 0;
}

size_t SenderSession::num_participants() const {
void SenderSession::get_slot_metrics(SenderSlotMetrics& slot_metrics) const {
roc_panic_if(!is_valid());

if (feedback_monitor_) {
return feedback_monitor_->num_participants();
} else {
return 0;
}
slot_metrics.source_id = identity_->ssrc();
slot_metrics.num_participants =
feedback_monitor_ ? feedback_monitor_->num_participants() : 0;
slot_metrics.is_complete = (frame_writer_ != NULL);
}

void SenderSession::get_participant_metrics(SenderParticipantMetrics* party_metrics,
size_t* party_count) const {
roc_panic_if(!is_valid());

roc_panic_if_not(party_metrics);
roc_panic_if_not(party_count);

if (feedback_monitor_) {
*party_count = std::min(*party_count, feedback_monitor_->num_participants());
if (party_metrics && party_count) {
*party_count = std::min(
*party_count, feedback_monitor_ ? feedback_monitor_->num_participants() : 0);

for (size_t n_part = 0; n_part < *party_count; n_part++) {
party_metrics[n_part].link = feedback_monitor_->link_metrics(n_part);
party_metrics[n_part].latency = feedback_monitor_->latency_metrics(n_part);
}
} else {
} else if (party_count) {
*party_count = 0;
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/internal_modules/roc_pipeline/sender_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,18 @@ class SenderSession : public core::NonCopyable<>, private rtcp::IParticipant {
//! if there are no frames
core::nanoseconds_t refresh(core::nanoseconds_t current_time);

//! Get number of remote participants.
//! On sender, all participants corresponds to a single SenderSession.
//! In case of unicast, there is only one participant (remote receiver),
//! but in case of multicast, multiple participants may be present.
size_t num_participants() const;
//! Get slot metrics.
//! @remarks
//! These metrics are for the whole slot.
//! For metrics for specific participant, see get_participant_metrics().
void get_slot_metrics(SenderSlotMetrics& slot_metrics) const;

//! Get metrics for remote participants.
//! @remarks
//! On sender, all participants corresponds to a single SenderSession.
//! In case of unicast, there is only one participant (remote receiver),
//! but in case of multicast, multiple participants may be present.
//! @note
//! @p party_metrics points to array of metrics structs, and @p party_count
//! defines number of array elements. Metrics are written to given array,
//! and @p party_count is updated of actual number of elements written.
Expand Down
12 changes: 2 additions & 10 deletions src/internal_modules/roc_pipeline/sender_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ bool SenderSlot::is_valid() const {
return valid_;
}

bool SenderSlot::is_complete() const {
return session_.frame_writer() != NULL;
}

SenderEndpoint* SenderSlot::add_endpoint(address::Interface iface,
address::Protocol proto,
const address::SocketAddr& outbound_address,
Expand Down Expand Up @@ -155,14 +151,10 @@ void SenderSlot::get_metrics(SenderSlotMetrics& slot_metrics,
size_t* party_count) const {
roc_panic_if(!is_valid());

slot_metrics = SenderSlotMetrics();
slot_metrics.is_complete = is_complete();
slot_metrics.num_participants = session_.num_participants();
session_.get_slot_metrics(slot_metrics);

if (party_metrics && party_count) {
if (party_metrics || party_count) {
session_.get_participant_metrics(party_metrics, party_count);
} else if (party_count) {
*party_count = 0;
}
}

Expand Down
3 changes: 0 additions & 3 deletions src/internal_modules/roc_pipeline/sender_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ class SenderSlot : public core::RefCounted<SenderSlot, core::ArenaAllocation>,
//! Check if the slot was succefully constructed.
bool is_valid() const;

//! Check if slot configuration is complete.
bool is_complete() const;

//! Add endpoint.
SenderEndpoint* add_endpoint(address::Interface iface,
address::Protocol proto,
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_rtcp/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ status::StatusCode Communicator::generate_reports(core::nanoseconds_t current_ti
}

do {
// TODO(gh-14): use IntervalComputer
// TODO(gh-674): use IntervalComputer
next_deadline_ += core::Millisecond * 200;
} while (next_deadline_ <= current_time);

Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_rtcp/reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
#include "roc_rtcp/headers.h"
#include "roc_rtcp/iparticipant.h"
#include "roc_rtcp/loss_estimator.h"
#include "roc_rtcp/packet_counter.h"
#include "roc_rtcp/reports.h"
#include "roc_rtcp/rtt_estimator.h"
#include "roc_rtcp/sdes.h"
#include "roc_rtcp/packet_counter.h"
#include "roc_status/status_code.h"

namespace roc {
Expand Down

0 comments on commit 8b27edc

Please sign in to comment.