Skip to content

Commit

Permalink
tooling: Add depacketizer metrics to csv dumps
Browse files Browse the repository at this point in the history
  • Loading branch information
gavv committed Aug 11, 2024
1 parent 73ea30e commit b26f9f0
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 68 deletions.
84 changes: 57 additions & 27 deletions src/internal_modules/roc_audio/depacketizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,29 @@ namespace audio {

namespace {

const core::nanoseconds_t LogInterval = 20 * core::Second;
const core::nanoseconds_t LogInterval = 30 * core::Second;

} // namespace

Depacketizer::Depacketizer(packet::IReader& packet_reader,
IFrameDecoder& payload_decoder,
FrameFactory& frame_factory,
const SampleSpec& sample_spec)
const SampleSpec& sample_spec,
dbgio::CsvDumper* dumper)
: frame_factory_(frame_factory)
, packet_reader_(packet_reader)
, payload_decoder_(payload_decoder)
, sample_spec_(sample_spec)
, stream_ts_(0)
, next_capture_ts_(0)
, valid_capture_ts_(false)
, padding_samples_(0)
, decoded_samples_(0)
, missing_samples_(0)
, fetched_packets_(0)
, dropped_packets_(0)
, late_samples_(0)
, recovered_samples_(0)
, is_started_(false)
, rate_limiter_(LogInterval)
, dumper_(dumper)
, init_status_(status::NoStatus) {
roc_panic_if_msg(!sample_spec_.is_valid() || !sample_spec_.is_raw(),
"depacketizer: required valid sample spec with raw format: %s",
Expand Down Expand Up @@ -128,7 +129,11 @@ status::StatusCode Depacketizer::read(Frame& frame,
}

commit_frame_(frame, frame_samples, frame_stats);

periodic_report_();
if (dumper_) {
dump_();
}

return frame.duration() == requested_duration ? status::StatusOK : status::StatusPart;
}
Expand Down Expand Up @@ -255,7 +260,13 @@ sample_t* Depacketizer::read_decoded_samples_(sample_t* buff_ptr, sample_t* buff
payload_decoder_.read_samples(buff_ptr, requested_samples);

stream_ts_ += (packet::stream_timestamp_t)decoded_samples;
decoded_samples_ += (packet::stream_timestamp_t)decoded_samples;
decoded_samples_ += decoded_samples;
metrics_.decoded_samples += decoded_samples;

if (packet_->has_flags(packet::Packet::FlagRestored)) {
recovered_samples_ += decoded_samples;
metrics_.recovered_samples += decoded_samples;
}

if (decoded_samples < requested_samples) {
payload_decoder_.end_frame();
Expand All @@ -272,11 +283,9 @@ sample_t* Depacketizer::read_missing_samples_(sample_t* buff_ptr, sample_t* buff
memset(buff_ptr, 0, missing_samples * sample_spec_.num_channels() * sizeof(sample_t));

stream_ts_ += (packet::stream_timestamp_t)missing_samples;

if (!is_started_) {
padding_samples_ += (packet::stream_timestamp_t)missing_samples;
} else {
missing_samples_ += (packet::stream_timestamp_t)missing_samples;
missing_samples_ += missing_samples;
if (is_started_) {
metrics_.missing_samples += missing_samples;
}

return (buff_ptr + missing_samples * sample_spec_.num_channels());
Expand Down Expand Up @@ -371,7 +380,6 @@ status::StatusCode Depacketizer::fetch_packet_(size_t requested_samples,

roc_panic_if(!pkt);
packet_ = pkt;
fetched_packets_++;

return code;
}
Expand All @@ -397,7 +405,8 @@ status::StatusCode Depacketizer::start_packet_() {
roc_log(LogTrace, "depacketizer: dropping late packet: stream_ts=%lu pkt_ts=%lu",
(unsigned long)stream_ts_, (unsigned long)pkt_begin);

dropped_packets_++;
late_samples_ += pkt_end - pkt_begin;
metrics_.late_samples += pkt_end - pkt_begin;
metrics_.late_packets++;

payload_decoder_.end_frame();
Expand All @@ -412,10 +421,12 @@ status::StatusCode Depacketizer::start_packet_() {
}

if (!is_started_) {
roc_log(LogDebug, "depacketizer: got first packet: pkt_ts=%lu zeros_before=%lu",
(unsigned long)pkt_begin, (unsigned long)padding_samples_);
roc_log(LogDebug,
"depacketizer: got first packet: start_ts=%lu start_latency=%lu",
(unsigned long)pkt_begin, (unsigned long)missing_samples_);

stream_ts_ = pkt_begin;
missing_samples_ = 0;
is_started_ = true;
}

Expand All @@ -437,6 +448,10 @@ status::StatusCode Depacketizer::start_packet_() {
(unsigned long)stream_ts_, (unsigned long)pkt_begin,
(unsigned long)diff_samples);

late_samples_ += diff_samples;
metrics_.late_samples += diff_samples;
metrics_.late_packets++;

if (valid_capture_ts_) {
next_capture_ts_ += sample_spec_.samples_per_chan_2_ns(diff_samples);
}
Expand Down Expand Up @@ -498,25 +513,40 @@ void Depacketizer::commit_frame_(Frame& frame,
}

void Depacketizer::periodic_report_() {
if (!rate_limiter_.allow()) {
if (!rate_limiter_.allow() || !is_started_) {
return;
}

const double loss_ratio = (missing_samples_ + decoded_samples_) != 0
? (double)missing_samples_ / (missing_samples_ + decoded_samples_)
: 0.;

const double drop_ratio =
fetched_packets_ != 0 ? (double)dropped_packets_ / fetched_packets_ : 0.;
const size_t total_samples = decoded_samples_ + missing_samples_;

roc_log(LogDebug,
"depacketizer:"
" fetched_pkts=%lu dropped_pkts=%lu loss_ratio=%.5lf late_ratio=%.5lf",
(unsigned long)fetched_packets_, (unsigned long)dropped_packets_, loss_ratio,
drop_ratio);
" period=%.2fms missing=%.2fms(%.3f%%)"
" late=%.2fms(%.3f%%) recovered=%.2fms(%.3f%%)",
sample_spec_.stream_timestamp_2_ms(total_samples),
sample_spec_.stream_timestamp_2_ms(missing_samples_),
(double)missing_samples_ / total_samples * 100,
sample_spec_.stream_timestamp_2_ms(late_samples_),
(double)late_samples_ / total_samples * 100,
sample_spec_.stream_timestamp_2_ms(recovered_samples_),
(double)recovered_samples_ / total_samples * 100);

decoded_samples_ = 0;
missing_samples_ = 0;
late_samples_ = 0;
recovered_samples_ = 0;
}

void Depacketizer::dump_() {
dbgio::CsvEntry e;
e.type = 'd';
e.n_fields = 4;
e.fields[0] = core::timestamp(core::ClockUnix);
e.fields[1] = metrics_.missing_samples;
e.fields[2] = metrics_.late_samples;
e.fields[3] = metrics_.recovered_samples;

missing_samples_ = decoded_samples_ = 0;
fetched_packets_ = dropped_packets_ = 0;
dumper_->write(e);
}

} // namespace audio
Expand Down
38 changes: 28 additions & 10 deletions src/internal_modules/roc_audio/depacketizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "roc_audio/sample_spec.h"
#include "roc_core/noncopyable.h"
#include "roc_core/rate_limiter.h"
#include "roc_dbgio/csv_dumper.h"
#include "roc_packet/ireader.h"

namespace roc {
Expand All @@ -27,22 +28,38 @@ namespace audio {
//! Metrics of depacketizer.
struct DepacketizerMetrics {
//! Cumulative count of packets from which we decoded samples.
//! Incremented each time depacketizer starts decoding a packet.
uint64_t decoded_packets;

//! Cumulative count of decoded samples.
uint64_t decoded_samples;

//! Cumulative count of samples missing due to losses or delays.
uint64_t missing_samples;

//! Cumulative count of packets dropped because they were late.
//! Incremented each time depacketizer drops a packet.
//! @note
//! This metric includes packets that were only partially late.
uint64_t late_packets;

//! Cumulative count of samples in late packets.
uint64_t late_samples;

//! Cumulative count of packets repaired by FEC.
//! Incremented each time depacketizer reads a packet with FlagRestored.
//! This metric excludes late packets that were repaired but then dropped.
//! @note
//! This metric excludes late packets that were repaired but then dropped.
uint64_t recovered_packets;

//! Cumulative count of samples in recovered packets.
uint64_t recovered_samples;

DepacketizerMetrics()
: decoded_packets(0)
, decoded_samples(0)
, missing_samples(0)
, late_packets(0)
, recovered_packets(0) {
, late_samples(0)
, recovered_packets(0)
, recovered_samples(0) {
}
};

Expand Down Expand Up @@ -78,7 +95,8 @@ class Depacketizer : public IFrameReader, public core::NonCopyable<> {
Depacketizer(packet::IReader& packet_reader,
IFrameDecoder& payload_decoder,
FrameFactory& frame_factory,
const SampleSpec& sample_spec);
const SampleSpec& sample_spec,
dbgio::CsvDumper* dumper);

//! Check if the object was successfully constructed.
status::StatusCode init_status() const;
Expand Down Expand Up @@ -141,6 +159,7 @@ class Depacketizer : public IFrameReader, public core::NonCopyable<> {
void commit_frame_(Frame& frame, size_t frame_samples, const FrameStats& stats);

void periodic_report_();
void dump_();

FrameFactory& frame_factory_;
packet::IReader& packet_reader_;
Expand All @@ -154,18 +173,17 @@ class Depacketizer : public IFrameReader, public core::NonCopyable<> {
core::nanoseconds_t next_capture_ts_;
bool valid_capture_ts_;

size_t padding_samples_;
size_t decoded_samples_;
size_t missing_samples_;

size_t fetched_packets_;
size_t dropped_packets_;
size_t late_samples_;
size_t recovered_samples_;

DepacketizerMetrics metrics_;

bool is_started_;

core::RateLimiter rate_limiter_;
dbgio::CsvDumper* dumper_;

status::StatusCode init_status_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config,
pkt_encoding->sample_spec.channel_set());

depacketizer_.reset(new (depacketizer_) audio::Depacketizer(
*pkt_reader, *payload_decoder_, frame_factory, out_spec));
*pkt_reader, *payload_decoder_, frame_factory, out_spec, dumper_));
if ((init_status_ = depacketizer_->init_status()) != status::StatusOK) {
return;
}
Expand Down
Loading

0 comments on commit b26f9f0

Please sign in to comment.