diff --git a/src/internal_modules/roc_rtcp/communicator.cpp b/src/internal_modules/roc_rtcp/communicator.cpp index c0491f630..6d425674c 100644 --- a/src/internal_modules/roc_rtcp/communicator.cpp +++ b/src/internal_modules/roc_rtcp/communicator.cpp @@ -366,7 +366,7 @@ status::StatusCode Communicator::generate_reports(core::nanoseconds_t current_ti do { // TODO(gh-674): use IntervalComputer - next_deadline_ += core::Millisecond * 200; + next_deadline_ += config_.report_interval; } while (next_deadline_ <= current_time); roc_log(LogTrace, "rtcp communicator: generating report packets"); diff --git a/src/internal_modules/roc_rtcp/config.h b/src/internal_modules/roc_rtcp/config.h index 596f95182..e02791c86 100644 --- a/src/internal_modules/roc_rtcp/config.h +++ b/src/internal_modules/roc_rtcp/config.h @@ -22,6 +22,9 @@ namespace rtcp { //! RTCP config. struct Config { + //! Interval between reports. + core::nanoseconds_t report_interval; + //! Timeout to remove inactive streams. core::nanoseconds_t inactivity_timeout; @@ -38,7 +41,8 @@ struct Config { bool enable_sdes; Config() - : inactivity_timeout(core::Second * 5) + : report_interval(core::Millisecond * 200) + , inactivity_timeout(core::Second * 5) , enable_sr_rr(true) , enable_xr(true) , enable_sdes(true) { diff --git a/src/tests/roc_pipeline/test_helpers/control_writer.h b/src/tests/roc_pipeline/test_helpers/control_writer.h index 387a12280..6bdeb88dd 100644 --- a/src/tests/roc_pipeline/test_helpers/control_writer.h +++ b/src/tests/roc_pipeline/test_helpers/control_writer.h @@ -13,12 +13,17 @@ #include "test_helpers/utils.h" +#include "roc_audio/latency_tuner.h" #include "roc_core/buffer_factory.h" #include "roc_core/noncopyable.h" +#include "roc_core/time.h" +#include "roc_packet/ilink_meter.h" #include "roc_packet/iwriter.h" #include "roc_packet/ntp.h" #include "roc_packet/packet_factory.h" +#include "roc_packet/units.h" #include "roc_rtcp/builder.h" +#include "roc_rtcp/headers.h" namespace roc { namespace pipeline { @@ -30,7 +35,6 @@ class ControlWriter : public core::NonCopyable<> { ControlWriter(packet::IWriter& writer, packet::PacketFactory& packet_factory, core::BufferFactory& buffer_factory, - packet::stream_source_t src_id, const address::SocketAddr& src_addr, const address::SocketAddr& dst_addr) : writer_(writer) @@ -38,7 +42,8 @@ class ControlWriter : public core::NonCopyable<> { , buffer_factory_(buffer_factory) , src_addr_(src_addr) , dst_addr_(dst_addr) - , source_(src_id) { + , local_source_(0) + , remote_source_(0) { } void write_sender_report(packet::ntp_timestamp_t ntp_ts, @@ -52,15 +57,15 @@ class ControlWriter : public core::NonCopyable<> { rtcp::Builder bld(cfg, buff); rtcp::header::SenderReportPacket sr; - sr.set_ssrc(source_); + sr.set_ssrc(local_source_); sr.set_ntp_timestamp(ntp_ts); sr.set_rtp_timestamp(rtp_ts); rtcp::SdesChunk chunk; - chunk.ssrc = source_; + chunk.ssrc = local_source_; rtcp::SdesItem item; item.type = rtcp::header::SDES_CNAME; - item.text = "test_cname"; + item.text = "test_send_cname"; bld.begin_sr(sr); bld.end_sr(); @@ -71,11 +76,85 @@ class ControlWriter : public core::NonCopyable<> { bld.end_sdes_chunk(); bld.end_sdes(); - UNSIGNED_LONGS_EQUAL(status::StatusOK, writer_.write(new_packet_(buff))); + LONGS_EQUAL(status::StatusOK, writer_.write(new_packet_(buff))); } - void set_source(packet::stream_source_t source) { - source_ = source; + void write_receiver_report(const audio::SampleSpec& sample_spec) { + core::Slice buff = buffer_factory_.new_buffer(); + CHECK(buff); + + buff.reslice(0, 0); + + rtcp::Config cfg; + rtcp::Builder bld(cfg, buff); + + rtcp::header::ReceiverReportPacket rr; + rr.set_ssrc(local_source_); + + rtcp::header::ReceptionReportBlock rr_blk; + rr_blk.set_ssrc(remote_source_); + rr_blk.set_cum_loss(link_metrics_.lost_packets); + rr_blk.set_last_seqnum(link_metrics_.ext_last_seqnum); + rr_blk.set_jitter(sample_spec.ns_2_stream_timestamp(link_metrics_.jitter)); + + rtcp::header::XrPacket xr; + xr.set_ssrc(local_source_); + + rtcp::header::XrMeasurementInfoBlock ms_info; + ms_info.set_ssrc(remote_source_); + ms_info.set_first_seq((packet::seqnum_t)link_metrics_.ext_first_seqnum); + + rtcp::header::XrDelayMetricsBlock delay_metrics; + delay_metrics.set_ssrc(remote_source_); + delay_metrics.set_e2e_latency( + packet::nanoseconds_2_ntp(latency_metrics_.e2e_latency)); + + rtcp::header::XrQueueMetricsBlock queue_metrics; + queue_metrics.set_ssrc(remote_source_); + queue_metrics.set_niq_latency( + packet::nanoseconds_2_ntp(latency_metrics_.niq_latency)); + queue_metrics.set_niq_stalling( + packet::nanoseconds_2_ntp(latency_metrics_.niq_stalling)); + + rtcp::SdesChunk chunk; + chunk.ssrc = local_source_; + rtcp::SdesItem item; + item.type = rtcp::header::SDES_CNAME; + item.text = "test_recv_cname"; + + bld.begin_rr(rr); + bld.add_rr_report(rr_blk); + bld.end_rr(); + + bld.begin_xr(xr); + bld.add_xr_measurement_info(ms_info); + bld.add_xr_delay_metrics(delay_metrics); + bld.add_xr_queue_metrics(queue_metrics); + bld.end_xr(); + + bld.begin_sdes(); + bld.begin_sdes_chunk(chunk); + bld.add_sdes_item(item); + bld.end_sdes_chunk(); + bld.end_sdes(); + + LONGS_EQUAL(status::StatusOK, writer_.write(new_packet_(buff))); + } + + void set_local_source(packet::stream_source_t source) { + local_source_ = source; + } + + void set_remote_source(packet::stream_source_t source) { + remote_source_ = source; + } + + void set_link_metrics(const packet::LinkMetrics& link_metrics) { + link_metrics_ = link_metrics; + } + + void set_latency_metrics(const audio::LatencyMetrics& latency_metrics) { + latency_metrics_ = latency_metrics; } private: @@ -101,7 +180,11 @@ class ControlWriter : public core::NonCopyable<> { address::SocketAddr src_addr_; address::SocketAddr dst_addr_; - packet::stream_source_t source_; + packet::stream_source_t local_source_; + packet::stream_source_t remote_source_; + + packet::LinkMetrics link_metrics_; + audio::LatencyMetrics latency_metrics_; }; } // namespace test diff --git a/src/tests/roc_pipeline/test_helpers/packet_proxy.h b/src/tests/roc_pipeline/test_helpers/packet_proxy.h deleted file mode 100644 index b5a61dc02..000000000 --- a/src/tests/roc_pipeline/test_helpers/packet_proxy.h +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright (c) 2015 Roc Streaming authors - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -#ifndef ROC_PIPELINE_TEST_HELPERS_PACKET_PROXY_H_ -#define ROC_PIPELINE_TEST_HELPERS_PACKET_PROXY_H_ - -#include - -#include "roc_core/log.h" -#include "roc_core/noncopyable.h" -#include "roc_packet/iwriter.h" -#include "roc_packet/packet_factory.h" -#include "roc_packet/queue.h" -#include "roc_rtcp/print_packet.h" -#include "roc_status/status_code.h" - -namespace roc { -namespace pipeline { -namespace test { - -// Copy sequence of packets to multiple writers -// Routes packet by type -// Clears packet meta-data as if packet was delivered over network -class PacketProxy : public packet::IWriter, core::NonCopyable<> { -public: - PacketProxy(packet::PacketFactory& packet_factory, - const address::SocketAddr& proxy_addr, - packet::IWriter* source_writer, - packet::IWriter* repair_writer, - packet::IWriter* control_writer) - : packet_factory_(packet_factory) - , proxy_addr_(proxy_addr) - , source_writer_(source_writer) - , repair_writer_(repair_writer) - , control_writer_(control_writer) - , n_source_(0) - , n_repair_(0) - , n_control_(0) { - } - - virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& pp) { - return queue_.write(pp); - } - - size_t n_source() const { - return n_source_; - } - - size_t n_repair() const { - return n_repair_; - } - - size_t n_control() const { - return n_control_; - } - - void deliver(size_t n_source_packets) { - for (size_t np = 0; np < n_source_packets;) { - packet::PacketPtr pp; - const status::StatusCode code = queue_.read(pp); - if (code != status::StatusOK) { - UNSIGNED_LONGS_EQUAL(status::StatusNoData, code); - break; - } - - if (core::Logger::instance().get_level() >= LogTrace) { - pp->print(packet::PrintHeaders); - if (pp->rtcp()) { - rtcp::print_packet(pp->rtcp()->payload); - } - } - - if (pp->flags() & packet::Packet::FlagControl) { - CHECK(control_writer_); - UNSIGNED_LONGS_EQUAL(status::StatusOK, - control_writer_->write(copy_packet_(pp))); - n_control_++; - } else if (pp->flags() & packet::Packet::FlagRepair) { - CHECK(repair_writer_); - UNSIGNED_LONGS_EQUAL(status::StatusOK, - repair_writer_->write(copy_packet_(pp))); - n_repair_++; - } else { - CHECK(source_writer_); - UNSIGNED_LONGS_EQUAL(status::StatusOK, - source_writer_->write(copy_packet_(pp))); - n_source_++; - np++; - } - } - } - -private: - // creates a new packet with the same buffer, without copying any meta-information - // like flags, parsed fields, etc; this way we simulate that packet was "delivered" - // over network - packets enters receiver's pipeline without any meta-information, - // and receiver fills that meta-information using packet parsers - packet::PacketPtr copy_packet_(const packet::PacketPtr& pa) { - packet::PacketPtr pb = packet_factory_.new_packet(); - CHECK(pb); - - CHECK(pa->flags() & packet::Packet::FlagUDP); - pb->add_flags(packet::Packet::FlagUDP); - *pb->udp() = *pa->udp(); - pb->udp()->src_addr = proxy_addr_; - - pb->set_buffer(pa->buffer()); - - return pb; - } - - packet::PacketFactory& packet_factory_; - - address::SocketAddr proxy_addr_; - - packet::IWriter* source_writer_; - packet::IWriter* repair_writer_; - packet::IWriter* control_writer_; - - size_t n_source_; - size_t n_repair_; - size_t n_control_; - - packet::Queue queue_; -}; - -} // namespace test -} // namespace pipeline -} // namespace roc - -#endif // ROC_PIPELINE_TEST_HELPERS_PACKET_PROXY_H_ diff --git a/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp b/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp index 51a8521d1..e01b7639d 100644 --- a/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp +++ b/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp @@ -10,17 +10,16 @@ #include "test_helpers/frame_reader.h" #include "test_helpers/frame_writer.h" -#include "test_helpers/packet_proxy.h" -#include "roc_address/interface.h" #include "roc_core/buffer_factory.h" #include "roc_core/heap_arena.h" -#include "roc_core/time.h" #include "roc_fec/codec_map.h" +#include "roc_packet/ireader.h" #include "roc_packet/packet_factory.h" #include "roc_packet/queue.h" #include "roc_pipeline/receiver_source.h" #include "roc_pipeline/sender_sink.h" +#include "roc_rtcp/print_packet.h" #include "roc_rtp/encoding_map.h" // This file contains integration tests that combine SenderSink and ReceiverSource. @@ -38,8 +37,8 @@ // // The tests use three helper classes: // - test::FrameWriter - to produce frames -// - test::PacketProxy - to simulate delivery of packets from sender to receiver // - test::FrameReader - to retrieve and validate frames +// - PacketProxy - to simulate delivery of packets from sender to receiver // // test::FrameWriter simulates sender sound card that produces frames, and // test::FrameReader simulates receiver sound card that consumes frames. @@ -69,6 +68,7 @@ enum { Latency = SamplesPerPacket * SourcePackets, Timeout = Latency * 20, + Warmup = SamplesPerPacket * 3, ManyFrames = Latency / SamplesPerFrame * 10, }; @@ -108,6 +108,125 @@ core::BufferFactory byte_buffer_factory(arena, MaxBufSize); packet::PacketFactory packet_factory(arena); rtp::EncodingMap encoding_map(arena); +// Copy sequence of packets to multiple writers. +// Routes packet by type. +// Clears packet meta-data as if packet was delivered over network. +// Simulates packet losses. +class PacketProxy : core::NonCopyable<> { +public: + PacketProxy(packet::PacketFactory& packet_factory, + const address::SocketAddr& proxy_addr, + packet::IWriter* source_writer, + packet::IWriter* repair_writer, + packet::IWriter* control_writer, + int flags) + : packet_factory_(packet_factory) + , proxy_addr_(proxy_addr) + , source_writer_(source_writer) + , repair_writer_(repair_writer) + , control_writer_(control_writer) + , n_source_(0) + , n_repair_(0) + , n_control_(0) + , flags_(flags) + , counter_(0) { + } + + size_t n_source() const { + return n_source_; + } + + size_t n_repair() const { + return n_repair_; + } + + size_t n_control() const { + return n_control_; + } + + void deliver_from(packet::IReader& reader) { + for (;;) { + packet::PacketPtr pp; + const status::StatusCode code = reader.read(pp); + if (code != status::StatusOK) { + UNSIGNED_LONGS_EQUAL(status::StatusNoData, code); + break; + } + + if ((flags_ & FlagLosses) + && counter_++ % (SourcePackets + RepairPackets) == 1) { + continue; + } + + if (pp->flags() & packet::Packet::FlagAudio) { + if (flags_ & FlagDropSource) { + continue; + } + print_packet_(pp); + CHECK(source_writer_); + LONGS_EQUAL(status::StatusOK, source_writer_->write(copy_packet_(pp))); + n_source_++; + } else if (pp->flags() & packet::Packet::FlagRepair) { + if (flags_ & FlagDropRepair) { + continue; + } + print_packet_(pp); + CHECK(repair_writer_); + LONGS_EQUAL(status::StatusOK, repair_writer_->write(copy_packet_(pp))); + n_repair_++; + } else if (pp->flags() & packet::Packet::FlagControl) { + print_packet_(pp); + CHECK(control_writer_); + LONGS_EQUAL(status::StatusOK, control_writer_->write(copy_packet_(pp))); + n_control_++; + } + } + } + +private: + // creates a new packet with the same buffer, without copying any meta-information + // like flags, parsed fields, etc; this way we simulate that packet was "delivered" + // over network - packets enters receiver's pipeline without any meta-information, + // and receiver fills that meta-information using packet parsers + packet::PacketPtr copy_packet_(const packet::PacketPtr& pa) { + packet::PacketPtr pb = packet_factory_.new_packet(); + CHECK(pb); + + CHECK(pa->flags() & packet::Packet::FlagUDP); + pb->add_flags(packet::Packet::FlagUDP); + *pb->udp() = *pa->udp(); + pb->udp()->src_addr = proxy_addr_; + + pb->set_buffer(pa->buffer()); + + return pb; + } + + void print_packet_(const packet::PacketPtr& pp) { + if (core::Logger::instance().get_level() >= LogTrace) { + pp->print(packet::PrintHeaders); + if (pp->rtcp()) { + rtcp::print_packet(pp->rtcp()->payload); + } + } + } + + packet::PacketFactory& packet_factory_; + + address::SocketAddr proxy_addr_; + + packet::IWriter* source_writer_; + packet::IWriter* repair_writer_; + packet::IWriter* control_writer_; + + size_t n_source_; + size_t n_repair_; + size_t n_control_; + + int flags_; + size_t counter_; +}; + SenderConfig make_sender_config(int flags, audio::ChannelMask frame_channels, audio::ChannelMask packet_channels) { @@ -146,6 +265,7 @@ SenderConfig make_sender_config(int flags, config.enable_timing = false; config.enable_profiling = true; + config.rtcp.report_interval = SamplesPerPacket * core::Second / SampleRate; config.rtcp.inactivity_timeout = Timeout * core::Second / SampleRate; return config; @@ -164,6 +284,7 @@ ReceiverConfig make_receiver_config(audio::ChannelMask frame_channels, config.common.enable_timing = false; + config.common.rtcp.report_interval = SamplesPerPacket * core::Second / SampleRate; config.common.rtcp.inactivity_timeout = Timeout * core::Second / SampleRate; config.default_session.latency.tuner_backend = audio::LatencyTunerBackend_Niq; @@ -172,8 +293,6 @@ ReceiverConfig make_receiver_config(audio::ChannelMask frame_channels, config.default_session.watchdog.no_playback_timeout = Timeout * core::Second / SampleRate; - (void)packet_channels; - return config; } @@ -214,26 +333,70 @@ bool is_fec_supported(int flags) { return true; } -void filter_packets(int flags, packet::IReader& reader, packet::IWriter& writer) { - size_t counter = 0; +void check_metrics(ReceiverSlot& receiver, SenderSlot& sender, int flags) { + ReceiverSlotMetrics recv_metrics; + ReceiverParticipantMetrics recv_party_metrics; + size_t recv_party_count = 1; + receiver.get_metrics(recv_metrics, &recv_party_metrics, &recv_party_count); - packet::PacketPtr pp; - while (reader.read(pp) == status::StatusOK) { - if ((flags & FlagLosses) && counter++ % (SourcePackets + RepairPackets) == 1) { - continue; - } + CHECK(recv_metrics.source_id > 0); - if (pp->flags() & packet::Packet::FlagRepair) { - if (flags & FlagDropRepair) { - continue; - } + UNSIGNED_LONGS_EQUAL(1, recv_metrics.num_participants); + UNSIGNED_LONGS_EQUAL(1, recv_party_count); + + CHECK(recv_party_metrics.link.ext_first_seqnum > 0); + CHECK(recv_party_metrics.link.ext_last_seqnum > 0); + + // TODO(gh-688): check that metrics are non-zero: + // - total_packets + // - lost_packets + // - jitter + + CHECK(recv_party_metrics.latency.niq_latency > 0); + CHECK(recv_party_metrics.latency.niq_stalling >= 0); + + if ((flags & FlagRTCP) && (flags & FlagCTS)) { + CHECK(recv_party_metrics.latency.e2e_latency > 0); + } else { + CHECK(recv_party_metrics.latency.e2e_latency == 0); + } + + SenderSlotMetrics send_metrics; + SenderParticipantMetrics send_party_metrics; + size_t send_party_count = 1; + sender.get_metrics(send_metrics, &send_party_metrics, &send_party_count); + + CHECK(send_metrics.source_id > 0); + + if (flags & FlagRTCP) { + UNSIGNED_LONGS_EQUAL(1, send_metrics.num_participants); + UNSIGNED_LONGS_EQUAL(1, send_party_count); + + UNSIGNED_LONGS_EQUAL(recv_party_metrics.link.ext_first_seqnum, + send_party_metrics.link.ext_first_seqnum); + CHECK(packet::seqnum_diff(recv_party_metrics.link.ext_last_seqnum, + send_party_metrics.link.ext_last_seqnum) + <= 1); + + // TODO(gh-688): check that metrics are equal on sender and receiver: + // - total_packets + // - lost_packets + // - jitter + + DOUBLES_EQUAL(recv_party_metrics.latency.niq_latency, + send_party_metrics.latency.niq_latency, core::Millisecond); + DOUBLES_EQUAL(recv_party_metrics.latency.niq_stalling, + send_party_metrics.latency.niq_stalling, core::Millisecond); + + if (flags & FlagCTS) { + DOUBLES_EQUAL(recv_party_metrics.latency.e2e_latency, + send_party_metrics.latency.e2e_latency, core::Microsecond); } else { - if (flags & FlagDropSource) { - continue; - } + CHECK(send_party_metrics.latency.e2e_latency == 0); } - - LONGS_EQUAL(status::StatusOK, writer.write(pp)); + } else { + UNSIGNED_LONGS_EQUAL(0, send_metrics.num_participants); + UNSIGNED_LONGS_EQUAL(0, send_party_count); } } @@ -268,6 +431,8 @@ void send_receive(int flags, SenderEndpoint* sender_repair_endpoint = NULL; SenderEndpoint* sender_control_endpoint = NULL; + packet::IWriter* sender_control_endpoint_writer = NULL; + sender_source_endpoint = sender_slot->add_endpoint(address::Iface_AudioSource, source_proto, receiver_source_addr, sender_outbound_queue); @@ -285,6 +450,7 @@ void send_receive(int flags, sender_slot->add_endpoint(address::Iface_AudioControl, control_proto, receiver_control_addr, sender_outbound_queue); CHECK(sender_control_endpoint); + sender_control_endpoint_writer = sender_control_endpoint->inbound_writer(); } ReceiverConfig receiver_config = @@ -326,63 +492,72 @@ void send_receive(int flags, } core::nanoseconds_t send_base_cts = -1; + core::nanoseconds_t virtual_e2e_latency = 0; + if (flags & FlagCTS) { send_base_cts = 1000000000000000; + virtual_e2e_latency = core::Millisecond * 100; } test::FrameWriter frame_writer(sender, sample_buffer_factory); + PacketProxy proxy(packet_factory, sender_addr, receiver_source_endpoint_writer, + receiver_repair_endpoint_writer, receiver_control_endpoint_writer, + flags); + + PacketProxy reverse_proxy(packet_factory, receiver_control_addr, NULL, NULL, + sender_control_endpoint_writer, flags); + + test::FrameReader frame_reader(receiver, sample_buffer_factory); + for (size_t nf = 0; nf < ManyFrames; nf++) { frame_writer.write_samples(SamplesPerFrame, sender_config.input_sample_spec, send_base_cts); sender.refresh(frame_writer.refresh_ts(send_base_cts)); - } - - test::PacketProxy packet_proxy( - packet_factory, sender_addr, receiver_source_endpoint_writer, - receiver_repair_endpoint_writer, receiver_control_endpoint_writer); - - filter_packets(flags, sender_outbound_queue, packet_proxy); - - test::FrameReader frame_reader(receiver, sample_buffer_factory); - packet_proxy.deliver(Latency / SamplesPerPacket); + proxy.deliver_from(sender_outbound_queue); - for (size_t np = 0; np < ManyFrames / FramesPerPacket; np++) { - for (size_t nf = 0; nf < FramesPerPacket; nf++) { + if (nf > Latency / SamplesPerFrame) { core::nanoseconds_t recv_base_cts = -1; if (flags & FlagCTS) { recv_base_cts = send_base_cts; } receiver.refresh(frame_reader.refresh_ts(recv_base_cts)); - frame_reader.read_samples(SamplesPerFrame, num_sessions, receiver_config.common.output_sample_spec, recv_base_cts); + if (flags & FlagCTS) { + receiver.reclock(frame_reader.last_capture_ts() + virtual_e2e_latency); + } + LONGS_EQUAL(num_sessions, receiver.num_sessions()); - } - packet_proxy.deliver(1); + reverse_proxy.deliver_from(receiver_outbound_queue); + + if (num_sessions == 1 && nf > (Latency + Warmup) / SamplesPerFrame) { + check_metrics(*receiver_slot, *sender_slot, flags); + } + } } if ((flags & FlagDropSource) == 0) { - CHECK(packet_proxy.n_source() > 0); + CHECK(proxy.n_source() > 0); } else { - CHECK(packet_proxy.n_source() == 0); + CHECK(proxy.n_source() == 0); } if ((flags & FlagDropRepair) == 0 && (flags & (FlagReedSolomon | FlagLDPC)) != 0) { - CHECK(packet_proxy.n_repair() > 0); + CHECK(proxy.n_repair() > 0); } else { - CHECK(packet_proxy.n_repair() == 0); + CHECK(proxy.n_repair() == 0); } if ((flags & FlagRTCP) != 0) { - CHECK(packet_proxy.n_control() > 0); + CHECK(proxy.n_control() > 0); } else { - CHECK(packet_proxy.n_control() == 0); + CHECK(proxy.n_control() == 0); } } diff --git a/src/tests/roc_pipeline/test_receiver_source.cpp b/src/tests/roc_pipeline/test_receiver_source.cpp index 61654252b..c26d6b10a 100644 --- a/src/tests/roc_pipeline/test_receiver_source.cpp +++ b/src/tests/roc_pipeline/test_receiver_source.cpp @@ -1884,8 +1884,9 @@ TEST(receiver_source, timestamp_mapping_one_control_packet) { src_addr1, dst_addr1, PayloadType_Ch2); test::ControlWriter control_writer(*control_endpoint, packet_factory, - byte_buffer_factory, src_id1, src_addr1, - dst_addr2); + byte_buffer_factory, src_addr1, dst_addr2); + + control_writer.set_local_source(src_id1); const core::nanoseconds_t capture_ts_base = 1000000000000000; const packet::stream_timestamp_t rtp_base = 1000000; @@ -1951,8 +1952,9 @@ TEST(receiver_source, timestamp_mapping_periodic_control_packets) { src_addr1, dst_addr1, PayloadType_Ch2); test::ControlWriter control_writer(*control_endpoint, packet_factory, - byte_buffer_factory, src_id1, src_addr1, - dst_addr2); + byte_buffer_factory, src_addr1, dst_addr2); + + control_writer.set_local_source(src_id1); const core::nanoseconds_t capture_ts_step = 1000000000000000; const packet::stream_timestamp_t rtp_base = 1000000; @@ -2021,8 +2023,9 @@ TEST(receiver_source, timestamp_mapping_remixing) { src_addr1, dst_addr1, PayloadType_Ch1); test::ControlWriter control_writer(*control_endpoint, packet_factory, - byte_buffer_factory, src_id1, src_addr1, - dst_addr2); + byte_buffer_factory, src_addr1, dst_addr2); + + control_writer.set_local_source(src_id1); const core::nanoseconds_t unix_base = 1000000000000000; const packet::stream_timestamp_t rtp_base = 1000000; @@ -2077,8 +2080,9 @@ TEST(receiver_source, timestamp_mapping_remixing) { CHECK(first_ts); } +// Check receiver metrics for multiple remote participants (senders). TEST(receiver_source, metrics_participants) { - enum { Rate = SampleRate, Chans = Chans_Stereo, MaxSess = 10 }; + enum { Rate = SampleRate, Chans = Chans_Stereo, MaxParties = 10 }; init(Rate, Chans, Rate, Chans); @@ -2091,8 +2095,8 @@ TEST(receiver_source, metrics_participants) { { ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; - size_t party_metrics_size = MaxSess; + ReceiverParticipantMetrics party_metrics[MaxParties]; + size_t party_metrics_size = MaxParties; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); @@ -2115,11 +2119,12 @@ TEST(receiver_source, metrics_participants) { { ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; - size_t party_metrics_size = MaxSess; + ReceiverParticipantMetrics party_metrics[MaxParties]; + size_t party_metrics_size = MaxParties; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + CHECK(slot_metrics.source_id != 0); UNSIGNED_LONGS_EQUAL(0, slot_metrics.num_participants); UNSIGNED_LONGS_EQUAL(0, party_metrics_size); } @@ -2136,11 +2141,12 @@ TEST(receiver_source, metrics_participants) { { ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; - size_t party_metrics_size = MaxSess; + ReceiverParticipantMetrics party_metrics[MaxParties]; + size_t party_metrics_size = MaxParties; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + CHECK(slot_metrics.source_id != 0); UNSIGNED_LONGS_EQUAL(1, slot_metrics.num_participants); UNSIGNED_LONGS_EQUAL(1, party_metrics_size); @@ -2169,11 +2175,12 @@ TEST(receiver_source, metrics_participants) { { ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; - size_t party_metrics_size = MaxSess; + ReceiverParticipantMetrics party_metrics[MaxParties]; + size_t party_metrics_size = MaxParties; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + CHECK(slot_metrics.source_id != 0); UNSIGNED_LONGS_EQUAL(2, slot_metrics.num_participants); UNSIGNED_LONGS_EQUAL(2, party_metrics_size); @@ -2189,7 +2196,7 @@ TEST(receiver_source, metrics_participants) { // Check how receiver returns metrics if provided buffer for metrics // is smaller than needed. TEST(receiver_source, metrics_truncation) { - enum { Rate = SampleRate, Chans = Chans_Stereo, MaxSess = 10 }; + enum { Rate = SampleRate, Chans = Chans_Stereo, MaxParties = 10 }; init(Rate, Chans, Rate, Chans); @@ -2227,9 +2234,9 @@ TEST(receiver_source, metrics_truncation) { UNSIGNED_LONGS_EQUAL(2, receiver.num_sessions()); - { // metrics_size=0 num_sessions=2 + { // metrics_size=0 num_participants=2 ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; + ReceiverParticipantMetrics party_metrics[MaxParties]; size_t party_metrics_size = 0; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); @@ -2238,13 +2245,14 @@ TEST(receiver_source, metrics_truncation) { UNSIGNED_LONGS_EQUAL(0, party_metrics_size); } - { // metrics_size=1 num_sessions=2 + { // metrics_size=1 num_participants=2 ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; + ReceiverParticipantMetrics party_metrics[MaxParties]; size_t party_metrics_size = 1; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + CHECK(slot_metrics.source_id != 0); UNSIGNED_LONGS_EQUAL(2, slot_metrics.num_participants); UNSIGNED_LONGS_EQUAL(1, party_metrics_size); @@ -2252,27 +2260,30 @@ TEST(receiver_source, metrics_truncation) { CHECK(party_metrics[1].latency.niq_latency == 0); } - { // metrics_size=2 num_sessions=2 + { // metrics_size=2 num_participants=2 ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; + ReceiverParticipantMetrics party_metrics[MaxParties]; size_t party_metrics_size = 2; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + CHECK(slot_metrics.source_id != 0); UNSIGNED_LONGS_EQUAL(2, slot_metrics.num_participants); UNSIGNED_LONGS_EQUAL(2, party_metrics_size); + CHECK(party_metrics[0].latency.niq_latency > 0); CHECK(party_metrics[1].latency.niq_latency > 0); CHECK(party_metrics[2].latency.niq_latency == 0); } - { // metrics_size=3 num_sessions=2 + { // metrics_size=3 num_participants=2 ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; + ReceiverParticipantMetrics party_metrics[MaxParties]; size_t party_metrics_size = 3; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + CHECK(slot_metrics.source_id != 0); UNSIGNED_LONGS_EQUAL(2, slot_metrics.num_participants); UNSIGNED_LONGS_EQUAL(2, party_metrics_size); @@ -2282,9 +2293,20 @@ TEST(receiver_source, metrics_truncation) { } } -// Check niq_latency metric (network incoming queue size). +// Check how receiver computes packet metrics: +// total_packets, lost_packets, ext_first_seqnum, ext_last_seqnum +IGNORE_TEST(receiver_source, metrics_packet_counters) { + // TODO(gh-688): implement test +} + +// Check how receiver computes jitter metric. +IGNORE_TEST(receiver_source, metrics_jitter) { + // TODO(gh-688): implement test +} + +// Check how receiver computes niq_latency metric (network incoming queue size). TEST(receiver_source, metrics_niq_latency) { - enum { Rate = SampleRate, Chans = Chans_Stereo, MaxSess = 10 }; + enum { Rate = SampleRate, Chans = Chans_Stereo, MaxParties = 10 }; init(Rate, Chans, Rate, Chans); @@ -2323,11 +2345,12 @@ TEST(receiver_source, metrics_niq_latency) { { ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; - size_t party_metrics_size = MaxSess; + ReceiverParticipantMetrics party_metrics[MaxParties]; + size_t party_metrics_size = MaxParties; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + CHECK(slot_metrics.source_id != 0); UNSIGNED_LONGS_EQUAL(1, slot_metrics.num_participants); UNSIGNED_LONGS_EQUAL(1, party_metrics_size); @@ -2337,10 +2360,10 @@ TEST(receiver_source, metrics_niq_latency) { } } -// Check e2e_latency metric (estimated end-to-end latency). +// Check how receiver computes e2e_latency metric (estimated end-to-end latency). // This metrics requires control packets exchange. TEST(receiver_source, metrics_e2e_latency) { - enum { Rate = SampleRate, Chans = Chans_Stereo, MaxSess = 10 }; + enum { Rate = SampleRate, Chans = Chans_Stereo, MaxParties = 10 }; init(Rate, Chans, Rate, Chans); @@ -2369,8 +2392,9 @@ TEST(receiver_source, metrics_e2e_latency) { src_addr1, dst_addr1, PayloadType_Ch2); test::ControlWriter control_writer(*control_endpoint, packet_factory, - byte_buffer_factory, src_id1, src_addr1, - dst_addr2); + byte_buffer_factory, src_addr1, dst_addr2); + + control_writer.set_local_source(src_id1); const core::nanoseconds_t capture_ts_base = 1000000000000000; const packet::stream_timestamp_t rtp_base = 1000000; @@ -2404,11 +2428,12 @@ TEST(receiver_source, metrics_e2e_latency) { { ReceiverSlotMetrics slot_metrics; - ReceiverParticipantMetrics party_metrics[MaxSess]; - size_t party_metrics_size = MaxSess; + ReceiverParticipantMetrics party_metrics[MaxParties]; + size_t party_metrics_size = MaxParties; slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + CHECK(slot_metrics.source_id != 0); UNSIGNED_LONGS_EQUAL(1, slot_metrics.num_participants); UNSIGNED_LONGS_EQUAL(1, party_metrics_size); diff --git a/src/tests/roc_pipeline/test_sender_sink.cpp b/src/tests/roc_pipeline/test_sender_sink.cpp index d726b8598..e8114f8de 100644 --- a/src/tests/roc_pipeline/test_sender_sink.cpp +++ b/src/tests/roc_pipeline/test_sender_sink.cpp @@ -8,6 +8,7 @@ #include +#include "test_helpers/control_writer.h" #include "test_helpers/frame_writer.h" #include "test_helpers/packet_reader.h" @@ -83,15 +84,33 @@ void create_transport_endpoint(SenderSlot* slot, CHECK(!endpoint->inbound_writer()); } +packet::IWriter* create_control_endpoint(SenderSlot* slot, + address::Interface iface, + address::Protocol proto, + const address::SocketAddr& outbound_address, + packet::IWriter& outbound_writer) { + CHECK(slot); + SenderEndpoint* endpoint = + slot->add_endpoint(iface, proto, outbound_address, outbound_writer); + CHECK(endpoint); + CHECK(endpoint->inbound_writer()); + return endpoint->inbound_writer(); +} + } // namespace TEST_GROUP(sender_sink) { audio::SampleSpec input_sample_spec; audio::SampleSpec packet_sample_spec; - address::Protocol source_proto; + address::Protocol proto; + + address::SocketAddr src_addr; address::SocketAddr dst_addr; + packet::stream_source_t src_ssrc; + packet::stream_source_t dst_ssrc; + SenderConfig make_config() { SenderConfig config; @@ -137,8 +156,13 @@ TEST_GROUP(sender_sink) { packet_sample_spec.channel_set().set_order(audio::ChanOrder_Smpte); packet_sample_spec.channel_set().set_channel_mask(packet_channels); - source_proto = address::Proto_RTP; - dst_addr = test::new_address(123); + proto = address::Proto_RTP; + + src_addr = test::new_address(111); + dst_addr = test::new_address(222); + + src_ssrc = 0; + dst_ssrc = 0; } }; @@ -155,8 +179,7 @@ TEST(sender_sink, write) { SenderSlot* slot = create_slot(sender); CHECK(slot); - create_transport_endpoint(slot, address::Iface_AudioSource, source_proto, dst_addr, - queue); + create_transport_endpoint(slot, address::Iface_AudioSource, proto, dst_addr, queue); test::FrameWriter frame_writer(sender, sample_buffer_factory); @@ -195,8 +218,7 @@ TEST(sender_sink, frame_size_small) { SenderSlot* slot = create_slot(sender); CHECK(slot); - create_transport_endpoint(slot, address::Iface_AudioSource, source_proto, dst_addr, - queue); + create_transport_endpoint(slot, address::Iface_AudioSource, proto, dst_addr, queue); test::FrameWriter frame_writer(sender, sample_buffer_factory); @@ -235,8 +257,7 @@ TEST(sender_sink, frame_size_large) { SenderSlot* slot = create_slot(sender); CHECK(slot); - create_transport_endpoint(slot, address::Iface_AudioSource, source_proto, dst_addr, - queue); + create_transport_endpoint(slot, address::Iface_AudioSource, proto, dst_addr, queue); test::FrameWriter frame_writer(sender, sample_buffer_factory); @@ -269,8 +290,7 @@ TEST(sender_sink, channel_mapping_stereo_to_mono) { SenderSlot* slot = create_slot(sender); CHECK(slot); - create_transport_endpoint(slot, address::Iface_AudioSource, source_proto, dst_addr, - queue); + create_transport_endpoint(slot, address::Iface_AudioSource, proto, dst_addr, queue); test::FrameWriter frame_writer(sender, sample_buffer_factory); @@ -303,8 +323,7 @@ TEST(sender_sink, channel_mapping_mono_to_stereo) { SenderSlot* slot = create_slot(sender); CHECK(slot); - create_transport_endpoint(slot, address::Iface_AudioSource, source_proto, dst_addr, - queue); + create_transport_endpoint(slot, address::Iface_AudioSource, proto, dst_addr, queue); test::FrameWriter frame_writer(sender, sample_buffer_factory); @@ -337,8 +356,7 @@ TEST(sender_sink, sample_rate_mapping) { SenderSlot* slot = create_slot(sender); CHECK(slot); - create_transport_endpoint(slot, address::Iface_AudioSource, source_proto, dst_addr, - queue); + create_transport_endpoint(slot, address::Iface_AudioSource, proto, dst_addr, queue); test::FrameWriter frame_writer(sender, sample_buffer_factory); @@ -372,8 +390,7 @@ TEST(sender_sink, timestamp_mapping) { CHECK(sender.is_valid()); SenderSlot* slot = create_slot(sender); - create_transport_endpoint(slot, address::Iface_AudioSource, source_proto, dst_addr, - queue); + create_transport_endpoint(slot, address::Iface_AudioSource, proto, dst_addr, queue); test::FrameWriter frame_writer(sender, sample_buffer_factory); @@ -413,8 +430,7 @@ TEST(sender_sink, timestamp_mapping_remixing) { SenderSlot* slot = create_slot(sender); CHECK(slot); - create_transport_endpoint(slot, address::Iface_AudioSource, source_proto, dst_addr, - queue); + create_transport_endpoint(slot, address::Iface_AudioSource, proto, dst_addr, queue); test::FrameWriter frame_writer(sender, sample_buffer_factory); @@ -450,5 +466,138 @@ TEST(sender_sink, timestamp_mapping_remixing) { } } +// Check sender metrics for multiple remote participants (receiver). +IGNORE_TEST(sender_sink, metrics_participants) { + // TODO(gh-674): add test for multiple receivers +} + +// Check how sender returns metrics if provided buffer for metrics +// is smaller than needed. +IGNORE_TEST(sender_sink, metrics_truncation) { + // TODO(gh-674): add test for multiple receivers +} + +// Check how sender fills metrics from feedback reports of remote receiver. +TEST(sender_sink, metrics_feedback) { + enum { Rate = SampleRate, Chans = Chans_Stereo, MaxParties = 10 }; + + init(Rate, Chans, Rate, Chans); + + packet::Queue queue; + + SenderSink sender(make_config(), encoding_map, packet_factory, byte_buffer_factory, + sample_buffer_factory, arena); + CHECK(sender.is_valid()); + + SenderSlot* slot = create_slot(sender); + CHECK(slot); + create_transport_endpoint(slot, address::Iface_AudioSource, proto, dst_addr, queue); + + packet::Queue control_outbound_queue; + packet::IWriter* control_endpoint = + create_control_endpoint(slot, address::Iface_AudioControl, address::Proto_RTCP, + dst_addr, control_outbound_queue); + CHECK(control_endpoint); + + test::FrameWriter frame_writer(sender, sample_buffer_factory); + + test::PacketReader packet_reader(arena, queue, encoding_map, packet_factory, dst_addr, + PayloadType_Ch2); + + const core::nanoseconds_t unix_base = 1000000000000000; + + for (size_t nf = 0; nf < ManyFrames; nf++) { + frame_writer.write_samples(SamplesPerFrame, input_sample_spec, unix_base); + sender.refresh(frame_writer.refresh_ts()); + } + + for (size_t np = 0; np < ManyFrames / FramesPerPacket; np++) { + packet_reader.read_packet(SamplesPerPacket, packet_sample_spec, unix_base); + } + + CHECK(control_outbound_queue.size() > 0); + + { + SenderSlotMetrics slot_metrics; + SenderParticipantMetrics party_metrics[MaxParties]; + size_t party_metrics_size = MaxParties; + + slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + + CHECK(slot_metrics.source_id != 0); + + src_ssrc = slot_metrics.source_id; + dst_ssrc = src_ssrc + 99999; + + UNSIGNED_LONGS_EQUAL(0, slot_metrics.num_participants); + UNSIGNED_LONGS_EQUAL(0, party_metrics_size); + } + + test::ControlWriter control_writer(*control_endpoint, packet_factory, + byte_buffer_factory, dst_addr, src_addr); + + control_writer.set_local_source(dst_ssrc); + control_writer.set_remote_source(src_ssrc); + + for (size_t np = 0; np < ManyFrames / FramesPerPacket; np++) { + const unsigned seed = (unsigned)np + 1; + + packet::LinkMetrics link_metrics; + link_metrics.ext_first_seqnum = seed * 100; + link_metrics.ext_last_seqnum = seed * 200; + link_metrics.total_packets = (seed * 200) - (seed * 100) + 1; + link_metrics.lost_packets = (int)seed * 40; + link_metrics.jitter = (int)seed * core::Millisecond * 50; + + audio::LatencyMetrics latency_metrics; + latency_metrics.niq_latency = (int)seed * core::Millisecond * 50; + latency_metrics.niq_stalling = (int)seed * core::Millisecond * 60; + latency_metrics.e2e_latency = (int)seed * core::Millisecond * 70; + + control_writer.set_link_metrics(link_metrics); + control_writer.set_latency_metrics(latency_metrics); + + control_writer.write_receiver_report(packet_sample_spec); + + for (size_t nf = 0; nf < FramesPerPacket; nf++) { + frame_writer.write_samples(SamplesPerFrame, input_sample_spec, unix_base); + sender.refresh(frame_writer.refresh_ts()); + } + packet_reader.read_packet(SamplesPerPacket, packet_sample_spec, unix_base); + + { + SenderSlotMetrics slot_metrics; + SenderParticipantMetrics party_metrics[MaxParties]; + size_t party_metrics_size = MaxParties; + + slot->get_metrics(slot_metrics, party_metrics, &party_metrics_size); + + UNSIGNED_LONGS_EQUAL(src_ssrc, slot_metrics.source_id); + UNSIGNED_LONGS_EQUAL(1, slot_metrics.num_participants); + UNSIGNED_LONGS_EQUAL(1, party_metrics_size); + + UNSIGNED_LONGS_EQUAL(link_metrics.ext_first_seqnum, + party_metrics[0].link.ext_first_seqnum); + UNSIGNED_LONGS_EQUAL(link_metrics.ext_last_seqnum, + party_metrics[0].link.ext_last_seqnum); + UNSIGNED_LONGS_EQUAL(link_metrics.total_packets, + party_metrics[0].link.total_packets); + UNSIGNED_LONGS_EQUAL(link_metrics.lost_packets, + party_metrics[0].link.lost_packets); + DOUBLES_EQUAL((double)link_metrics.jitter, + (double)party_metrics[0].link.jitter, core::Nanosecond); + + DOUBLES_EQUAL((double)latency_metrics.niq_latency, + (double)party_metrics[0].latency.niq_latency, + core::Microsecond * 16); + DOUBLES_EQUAL((double)latency_metrics.niq_stalling, + (double)party_metrics[0].latency.niq_stalling, + core::Microsecond * 16); + DOUBLES_EQUAL((double)latency_metrics.e2e_latency, + (double)party_metrics[0].latency.e2e_latency, core::Nanosecond); + } + } +} + } // namespace pipeline } // namespace roc