Skip to content

Commit

Permalink
roc-streaminggh-765: Pipeline tests for adaptive latency & jitter
Browse files Browse the repository at this point in the history
- adaptive_latency_increase
- adaptive_latency_decrease
- adaptive_latency_upper_bound
- adaptive_latency_lower_bound
- metrics_jitter
  • Loading branch information
gavv committed Aug 7, 2024
1 parent c861e41 commit 4c60d80
Show file tree
Hide file tree
Showing 3 changed files with 451 additions and 22 deletions.
3 changes: 2 additions & 1 deletion src/internal_modules/roc_packet/router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ status::StatusCode Router::write(const PacketPtr& packet) {

if (Route* route = find_route_(packet->flags())) {
if (allow_route_(*route, *packet)) {
if (packet->udp()) {
if (packet->has_flags(Packet::FlagUDP)
&& packet->udp()->queue_timestamp == 0) {
packet->udp()->queue_timestamp = core::timestamp(core::ClockUnix);
}

Expand Down
51 changes: 41 additions & 10 deletions src/tests/roc_pipeline/test_helpers/packet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "test_helpers/utils.h"

#include "roc_audio/iframe_encoder.h"
#include "roc_core/fast_random.h"
#include "roc_core/noncopyable.h"
#include "roc_core/scoped_ptr.h"
#include "roc_fec/block_writer.h"
Expand Down Expand Up @@ -56,6 +57,9 @@ class PacketWriter : public core::NonCopyable<> {
, timestamp_(0)
, pt_(pt)
, sample_offset_(0)
, qts_(core::Minute * 10000)
, qts_jitter_lo_(0)
, qts_jitter_hi_(0)
, corrupt_flag_(false) {
construct_(arena, packet_factory, encoding_map, pt, packet::FEC_None,
fec::BlockWriterConfig());
Expand Down Expand Up @@ -85,6 +89,9 @@ class PacketWriter : public core::NonCopyable<> {
, timestamp_(0)
, pt_(pt)
, sample_offset_(0)
, qts_(core::Minute * 10000)
, qts_jitter_lo_(0)
, qts_jitter_hi_(0)
, corrupt_flag_(false) {
construct_(arena, packet_factory, encoding_map, pt, fec_scheme, fec_config);
}
Expand All @@ -97,7 +104,7 @@ class PacketWriter : public core::NonCopyable<> {
for (size_t np = 0; np < num_packets; np++) {
packet::PacketPtr pp = create_packet_(samples_per_packet, sample_spec);
CHECK(pp);
deliver_packet_(pp);
deliver_packet_(pp, sample_spec);
}
}

Expand All @@ -112,7 +119,7 @@ class PacketWriter : public core::NonCopyable<> {
}
}

void shift_to(size_t num_packets, size_t samples_per_packet) {
void jump_to(size_t num_packets, size_t samples_per_packet) {
seqnum_ = packet::seqnum_t(num_packets);
timestamp_ = packet::stream_timestamp_t(num_packets * samples_per_packet);
sample_offset_ = uint8_t(num_packets * samples_per_packet);
Expand Down Expand Up @@ -150,6 +157,11 @@ class PacketWriter : public core::NonCopyable<> {
timestamp_ = timestamp;
}

void set_jitter(core::nanoseconds_t jitter_lo, core::nanoseconds_t jitter_hi) {
qts_jitter_lo_ = jitter_lo;
qts_jitter_hi_ = jitter_hi;
}

void corrupt_packets(bool corrupt) {
corrupt_flag_ = corrupt;
}
Expand Down Expand Up @@ -246,6 +258,7 @@ class PacketWriter : public core::NonCopyable<> {
pp->rtp()->seqnum = seqnum_;
pp->rtp()->stream_timestamp = timestamp_;
pp->rtp()->payload_type = pt_;
pp->rtp()->duration = samples_per_packet;

seqnum_++;
timestamp_ += samples_per_packet;
Expand All @@ -271,7 +284,8 @@ class PacketWriter : public core::NonCopyable<> {
return pp;
}

void deliver_packet_(const packet::PacketPtr& pp) {
void deliver_packet_(const packet::PacketPtr& pp,
const audio::SampleSpec& sample_spec) {
if (fec_writer_) {
// fec_writer will produce source and repair packets and store in fec_queue
// note that we're calling copy_packet_() only after fec_writer, because
Expand All @@ -284,33 +298,47 @@ class PacketWriter : public core::NonCopyable<> {
while (fec_queue_.read(fp, packet::ModeFetch) == status::StatusOK) {
if (fp->has_flags(packet::Packet::FlagAudio)) {
CHECK(source_composer_->compose(*fp));
LONGS_EQUAL(status::StatusOK,
source_writer_->write(copy_packet_(fp)));
LONGS_EQUAL(
status::StatusOK,
source_writer_->write(prepare_for_delivery_(fp, sample_spec)));
} else {
CHECK(repair_composer_->compose(*fp));
LONGS_EQUAL(status::StatusOK,
repair_writer_->write(copy_packet_(fp)));
LONGS_EQUAL(
status::StatusOK,
repair_writer_->write(prepare_for_delivery_(fp, sample_spec)));
}
}
} else {
// compose and "deliver" packet
CHECK(source_composer_->compose(*pp));
LONGS_EQUAL(status::StatusOK, source_writer_->write(copy_packet_(pp)));
LONGS_EQUAL(status::StatusOK,
source_writer_->write(prepare_for_delivery_(pp, sample_spec)));
}
}

// creates a new packet with the same buffer, without copying any meta-information
// like flags, parsed fields, etc; this way we simulate that packet was "delivered"
// over network - packets enters receiver's pipeline without any meta-information,
// and receiver fills that meta-information using packet parsers
packet::PacketPtr copy_packet_(const packet::PacketPtr& pa) {
packet::PacketPtr prepare_for_delivery_(const packet::PacketPtr& pa,
const audio::SampleSpec& sample_spec) {
packet::PacketPtr pb = packet_factory_.new_packet();
CHECK(pb);

pb->add_flags(packet::Packet::FlagUDP);
pb->udp()->src_addr = src_addr_;
pb->udp()->dst_addr = source_dst_addr_;

// timestamp when the packet was "received"
pb->udp()->queue_timestamp = qts_;
if (pa->duration() > 0) {
qts_ += sample_spec.stream_timestamp_2_ns(pa->duration());
if (qts_jitter_hi_ > 0) {
qts_ += (core::nanoseconds_t)core::fast_random_range(
(uint64_t)qts_jitter_lo_, (uint64_t)qts_jitter_hi_);
}
}

pb->set_buffer(pa->buffer());

if (corrupt_flag_) {
Expand Down Expand Up @@ -344,9 +372,12 @@ class PacketWriter : public core::NonCopyable<> {
packet::stream_timestamp_t timestamp_;

rtp::PayloadType pt_;

uint8_t sample_offset_;

core::nanoseconds_t qts_;
core::nanoseconds_t qts_jitter_lo_;
core::nanoseconds_t qts_jitter_hi_;

bool corrupt_flag_;
};

Expand Down
Loading

0 comments on commit 4c60d80

Please sign in to comment.