From 965ed7397fffaaf29a312ec63286ae349c9e4dd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Wed, 20 Nov 2024 10:53:20 +0100 Subject: [PATCH] Use chrono steady clock for frame timing (#173) * Use chrono steady_clock for frame timing * Fix formatting for humble * Use chrono in png snapshot streamer * Don't use std::optional --- include/web_video_server/image_streamer.hpp | 9 ++++---- include/web_video_server/jpeg_streamers.hpp | 4 ++-- include/web_video_server/libav_streamer.hpp | 6 +++-- include/web_video_server/multipart_stream.hpp | 15 ++++++------ include/web_video_server/png_streamers.hpp | 4 ++-- .../ros_compressed_streamer.hpp | 6 ++--- include/web_video_server/web_video_server.hpp | 2 +- src/image_streamer.cpp | 10 ++++---- src/jpeg_streamers.cpp | 14 +++++++---- src/libav_streamer.cpp | 18 +++++++++------ src/multipart_stream.cpp | 23 +++++++++++-------- src/png_streamers.cpp | 13 +++++++---- src/ros_compressed_streamer.cpp | 15 ++++++------ src/web_video_server.cpp | 4 ++-- 14 files changed, 82 insertions(+), 61 deletions(-) diff --git a/include/web_video_server/image_streamer.hpp b/include/web_video_server/image_streamer.hpp index 74c75f9..0e001c5 100644 --- a/include/web_video_server/image_streamer.hpp +++ b/include/web_video_server/image_streamer.hpp @@ -30,6 +30,7 @@ #pragma once +#include #include #include @@ -64,7 +65,7 @@ class ImageStreamer /** * Restreams the last received image frame if older than max_age. */ - virtual void restreamFrame(double max_age) = 0; + virtual void restreamFrame(std::chrono::duration max_age) = 0; std::string getTopic() { @@ -94,8 +95,8 @@ class ImageTransportImageStreamer : public ImageStreamer protected: virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg); - virtual void sendImage(const cv::Mat &, const rclcpp::Time & time) = 0; - virtual void restreamFrame(double max_age); + virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time) = 0; + virtual void restreamFrame(std::chrono::duration max_age); virtual void initialize(const cv::Mat &); image_transport::Subscriber image_sub_; @@ -105,7 +106,7 @@ class ImageTransportImageStreamer : public ImageStreamer std::string default_transport_; std::string qos_profile_name_; - rclcpp::Time last_frame; + std::chrono::steady_clock::time_point last_frame_; cv::Mat output_size_image; std::mutex send_mutex_; diff --git a/include/web_video_server/jpeg_streamers.hpp b/include/web_video_server/jpeg_streamers.hpp index 6946215..364357f 100644 --- a/include/web_video_server/jpeg_streamers.hpp +++ b/include/web_video_server/jpeg_streamers.hpp @@ -52,7 +52,7 @@ class MjpegStreamer : public ImageTransportImageStreamer ~MjpegStreamer(); protected: - virtual void sendImage(const cv::Mat &, const rclcpp::Time & time); + virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); private: MultipartStream stream_; @@ -78,7 +78,7 @@ class JpegSnapshotStreamer : public ImageTransportImageStreamer ~JpegSnapshotStreamer(); protected: - virtual void sendImage(const cv::Mat &, const rclcpp::Time & time); + virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); private: int quality_; diff --git a/include/web_video_server/libav_streamer.hpp b/include/web_video_server/libav_streamer.hpp index 2f4db10..12914bb 100644 --- a/include/web_video_server/libav_streamer.hpp +++ b/include/web_video_server/libav_streamer.hpp @@ -42,6 +42,7 @@ extern "C" #include } +#include #include #include @@ -66,7 +67,7 @@ class LibavStreamer : public ImageTransportImageStreamer protected: virtual void initializeEncoder(); - virtual void sendImage(const cv::Mat &, const rclcpp::Time & time); + virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); virtual void initialize(const cv::Mat &); AVFormatContext * format_context_; const AVCodec * codec_; @@ -78,8 +79,9 @@ class LibavStreamer : public ImageTransportImageStreamer private: AVFrame * frame_; struct SwsContext * sws_context_; - rclcpp::Time first_image_timestamp_; std::mutex encode_mutex_; + bool first_image_received_; + std::chrono::steady_clock::time_point first_image_time_; std::string format_name_; std::string codec_name_; diff --git a/include/web_video_server/multipart_stream.hpp b/include/web_video_server/multipart_stream.hpp index d372ba8..fae1352 100644 --- a/include/web_video_server/multipart_stream.hpp +++ b/include/web_video_server/multipart_stream.hpp @@ -43,7 +43,7 @@ namespace web_video_server struct PendingFooter { - rclcpp::Time timestamp; + std::chrono::steady_clock::time_point timestamp; std::weak_ptr contents; }; @@ -51,26 +51,27 @@ class MultipartStream { public: MultipartStream( - std::function get_now, async_web_server_cpp::HttpConnectionPtr & connection, const std::string & boundry = "boundarydonotcross", std::size_t max_queue_size = 1); void sendInitialHeader(); - void sendPartHeader(const rclcpp::Time & time, const std::string & type, size_t payload_size); - void sendPartFooter(const rclcpp::Time & time); + void sendPartHeader( + const std::chrono::steady_clock::time_point & time, const std::string & type, + size_t payload_size); + void sendPartFooter(const std::chrono::steady_clock::time_point & time); void sendPartAndClear( - const rclcpp::Time & time, const std::string & type, + const std::chrono::steady_clock::time_point & time, const std::string & type, std::vector & data); void sendPart( - const rclcpp::Time & time, const std::string & type, const boost::asio::const_buffer & buffer, + const std::chrono::steady_clock::time_point & time, const std::string & type, + const boost::asio::const_buffer & buffer, async_web_server_cpp::HttpConnection::ResourcePtr resource); private: bool isBusy(); private: - std::function get_now_; const std::size_t max_queue_size_; async_web_server_cpp::HttpConnectionPtr connection_; std::string boundry_; diff --git a/include/web_video_server/png_streamers.hpp b/include/web_video_server/png_streamers.hpp index 14f370e..c261fe6 100644 --- a/include/web_video_server/png_streamers.hpp +++ b/include/web_video_server/png_streamers.hpp @@ -51,7 +51,7 @@ class PngStreamer : public ImageTransportImageStreamer ~PngStreamer(); protected: - virtual void sendImage(const cv::Mat &, const rclcpp::Time & time); + virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg); private: @@ -78,7 +78,7 @@ class PngSnapshotStreamer : public ImageTransportImageStreamer ~PngSnapshotStreamer(); protected: - virtual void sendImage(const cv::Mat &, const rclcpp::Time & time); + virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg); private: diff --git a/include/web_video_server/ros_compressed_streamer.hpp b/include/web_video_server/ros_compressed_streamer.hpp index aceb076..9fc51bd 100644 --- a/include/web_video_server/ros_compressed_streamer.hpp +++ b/include/web_video_server/ros_compressed_streamer.hpp @@ -51,18 +51,18 @@ class RosCompressedStreamer : public ImageStreamer rclcpp::Node::SharedPtr node); ~RosCompressedStreamer(); virtual void start(); - virtual void restreamFrame(double max_age); + virtual void restreamFrame(std::chrono::duration max_age); protected: virtual void sendImage( const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, - const rclcpp::Time & time); + const std::chrono::steady_clock::time_point & time); private: void imageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg); MultipartStream stream_; rclcpp::Subscription::SharedPtr image_sub_; - rclcpp::Time last_frame; + std::chrono::steady_clock::time_point last_frame_; sensor_msgs::msg::CompressedImage::ConstSharedPtr last_msg; std::mutex send_mutex_; std::string qos_profile_name_; diff --git a/include/web_video_server/web_video_server.hpp b/include/web_video_server/web_video_server.hpp index 9f74134..6e9bd95 100644 --- a/include/web_video_server/web_video_server.hpp +++ b/include/web_video_server/web_video_server.hpp @@ -94,7 +94,7 @@ class WebVideoServer : public rclcpp::Node const char * begin, const char * end); private: - void restreamFrames(double max_age); + void restreamFrames(std::chrono::duration max_age); void cleanup_inactive_streams(); rclcpp::TimerBase::SharedPtr cleanup_timer_; diff --git a/src/image_streamer.cpp b/src/image_streamer.cpp index 5867db6..8a83958 100644 --- a/src/image_streamer.cpp +++ b/src/image_streamer.cpp @@ -110,16 +110,16 @@ void ImageTransportImageStreamer::initialize(const cv::Mat &) { } -void ImageTransportImageStreamer::restreamFrame(double max_age) +void ImageTransportImageStreamer::restreamFrame(std::chrono::duration max_age) { if (inactive_ || !initialized_) { return; } try { - if (last_frame + rclcpp::Duration::from_seconds(max_age) < node_->now() ) { + if (last_frame_ + max_age < std::chrono::steady_clock::now()) { std::scoped_lock lock(send_mutex_); // don't update last_frame, it may remain an old value. - sendImage(output_size_image, node_->now()); + sendImage(output_size_image, std::chrono::steady_clock::now()); } } catch (boost::system::system_error & e) { // happens when client disconnects @@ -199,8 +199,8 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C initialized_ = true; } - last_frame = node_->now(); - sendImage(output_size_image, msg->header.stamp); + last_frame_ = std::chrono::steady_clock::now(); + sendImage(output_size_image, last_frame_); } catch (cv_bridge::Exception & e) { auto & clk = *node_->get_clock(); RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "cv_bridge exception: %s", e.what()); diff --git a/src/jpeg_streamers.cpp b/src/jpeg_streamers.cpp index 9123e36..bfc6c62 100644 --- a/src/jpeg_streamers.cpp +++ b/src/jpeg_streamers.cpp @@ -38,7 +38,7 @@ MjpegStreamer::MjpegStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) : ImageTransportImageStreamer(request, connection, node), - stream_(std::bind(&rclcpp::Node::now, node), connection) + stream_(connection) { quality_ = request.get_query_param_value_or_default("quality", 95); stream_.sendInitialHeader(); @@ -50,7 +50,9 @@ MjpegStreamer::~MjpegStreamer() std::scoped_lock lock(send_mutex_); // protects sendImage. } -void MjpegStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time) +void MjpegStreamer::sendImage( + const cv::Mat & img, + const std::chrono::steady_clock::time_point & time) { std::vector encode_params; encode_params.push_back(cv::IMWRITE_JPEG_QUALITY); @@ -94,7 +96,9 @@ JpegSnapshotStreamer::~JpegSnapshotStreamer() std::scoped_lock lock(send_mutex_); // protects sendImage. } -void JpegSnapshotStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time) +void JpegSnapshotStreamer::sendImage( + const cv::Mat & img, + const std::chrono::steady_clock::time_point & time) { std::vector encode_params; encode_params.push_back(cv::IMWRITE_JPEG_QUALITY); @@ -104,7 +108,9 @@ void JpegSnapshotStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & t cv::imencode(".jpeg", img, encoded_buffer, encode_params); char stamp[20]; - snprintf(stamp, sizeof(stamp), "%.06lf", time.seconds()); + snprintf( + stamp, sizeof(stamp), "%.06lf", + std::chrono::duration_cast>(time.time_since_epoch()).count()); async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok) .header("Connection", "close") .header("Server", "web_video_server") diff --git a/src/libav_streamer.cpp b/src/libav_streamer.cpp index 126fa6b..4814c4e 100644 --- a/src/libav_streamer.cpp +++ b/src/libav_streamer.cpp @@ -44,9 +44,9 @@ LibavStreamer::LibavStreamer( const std::string & format_name, const std::string & codec_name, const std::string & content_type) : ImageTransportImageStreamer(request, connection, node), format_context_(0), codec_(0), - codec_context_(0), video_stream_(0), frame_(0), sws_context_(0), first_image_timestamp_(0), - format_name_(format_name), codec_name_(codec_name), content_type_(content_type), opt_(0), - io_buffer_(0) + codec_context_(0), video_stream_(0), frame_(0), sws_context_(0), + first_image_received_(false), first_image_time_(), format_name_(format_name), + codec_name_(codec_name), content_type_(content_type), opt_(0), io_buffer_(0) { bitrate_ = request.get_query_param_value_or_default("bitrate", 100000); qmin_ = request.get_query_param_value_or_default("qmin", 10); @@ -215,11 +215,14 @@ void LibavStreamer::initializeEncoder() { } -void LibavStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time) +void LibavStreamer::sendImage( + const cv::Mat & img, + const std::chrono::steady_clock::time_point & time) { std::scoped_lock lock(encode_mutex_); - if (0 == first_image_timestamp_.nanoseconds()) { - first_image_timestamp_ = time; + if (!first_image_received_) { + first_image_received_ = true; + first_image_time_ = time; } AVPixelFormat input_coding_format = AV_PIX_FMT_BGR24; @@ -274,7 +277,8 @@ void LibavStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time) std::size_t size; uint8_t * output_buf; - double seconds = (time - first_image_timestamp_).seconds(); + double seconds = std::chrono::duration_cast>( + time - first_image_time_).count(); // Encode video at 1/0.95 to minimize delay pkt->pts = (int64_t)(seconds / av_q2d(video_stream_->time_base) * 0.95); if (pkt->pts <= 0) { diff --git a/src/multipart_stream.cpp b/src/multipart_stream.cpp index c43de8a..b26d229 100644 --- a/src/multipart_stream.cpp +++ b/src/multipart_stream.cpp @@ -35,11 +35,10 @@ namespace web_video_server { MultipartStream::MultipartStream( - std::function get_now, async_web_server_cpp::HttpConnectionPtr & connection, const std::string & boundry, std::size_t max_queue_size) -: get_now_(get_now), connection_(connection), boundry_(boundry), max_queue_size_(max_queue_size) +: connection_(connection), boundry_(boundry), max_queue_size_(max_queue_size) {} void MultipartStream::sendInitialHeader() @@ -58,11 +57,13 @@ void MultipartStream::sendInitialHeader() } void MultipartStream::sendPartHeader( - const rclcpp::Time & time, const std::string & type, + const std::chrono::steady_clock::time_point & time, const std::string & type, size_t payload_size) { char stamp[20]; - snprintf(stamp, sizeof(stamp), "%.06lf", time.seconds()); + snprintf( + stamp, sizeof(stamp), "%.06lf", + std::chrono::duration_cast>(time.time_since_epoch()).count()); std::shared_ptr> headers( new std::vector()); headers->push_back(async_web_server_cpp::HttpHeader("Content-type", type)); @@ -73,7 +74,7 @@ void MultipartStream::sendPartHeader( connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers); } -void MultipartStream::sendPartFooter(const rclcpp::Time & time) +void MultipartStream::sendPartFooter(const std::chrono::steady_clock::time_point & time) { std::shared_ptr str(new std::string("\r\n--" + boundry_ + "\r\n")); PendingFooter pf; @@ -84,7 +85,7 @@ void MultipartStream::sendPartFooter(const rclcpp::Time & time) } void MultipartStream::sendPartAndClear( - const rclcpp::Time & time, const std::string & type, + const std::chrono::steady_clock::time_point & time, const std::string & type, std::vector & data) { if (!isBusy()) { @@ -95,7 +96,7 @@ void MultipartStream::sendPartAndClear( } void MultipartStream::sendPart( - const rclcpp::Time & time, const std::string & type, + const std::chrono::steady_clock::time_point & time, const std::string & type, const boost::asio::const_buffer & buffer, async_web_server_cpp::HttpConnection::ResourcePtr resource) { @@ -108,13 +109,15 @@ void MultipartStream::sendPart( bool MultipartStream::isBusy() { - rclcpp::Time currentTime = get_now_(); + auto current_time = std::chrono::steady_clock::now(); while (!pending_footers_.empty()) { if (pending_footers_.front().contents.expired()) { pending_footers_.pop(); } else { - rclcpp::Time footerTime = pending_footers_.front().timestamp; - if ((currentTime - footerTime).seconds() > 0.5) { + auto footer_time = pending_footers_.front().timestamp; + if (std::chrono::duration_cast>( + (current_time - footer_time)).count() > 0.5) + { pending_footers_.pop(); } else { break; diff --git a/src/png_streamers.cpp b/src/png_streamers.cpp index d801097..c3c9327 100644 --- a/src/png_streamers.cpp +++ b/src/png_streamers.cpp @@ -42,8 +42,7 @@ namespace web_video_server PngStreamer::PngStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageTransportImageStreamer(request, connection, node), - stream_(std::bind(&rclcpp::Node::now, node), connection) +: ImageTransportImageStreamer(request, connection, node), stream_(connection) { quality_ = request.get_query_param_value_or_default("quality", 3); stream_.sendInitialHeader(); @@ -66,7 +65,7 @@ cv::Mat PngStreamer::decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & } } -void PngStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time) +void PngStreamer::sendImage(const cv::Mat & img, const std::chrono::steady_clock::time_point & time) { std::vector encode_params; encode_params.push_back(cv::IMWRITE_PNG_COMPRESSION); @@ -121,7 +120,9 @@ cv::Mat PngSnapshotStreamer::decodeImage(const sensor_msgs::msg::Image::ConstSha } } -void PngSnapshotStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time) +void PngSnapshotStreamer::sendImage( + const cv::Mat & img, + const std::chrono::steady_clock::time_point & time) { std::vector encode_params; encode_params.push_back(cv::IMWRITE_PNG_COMPRESSION); @@ -131,7 +132,9 @@ void PngSnapshotStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & ti cv::imencode(".png", img, encoded_buffer, encode_params); char stamp[20]; - snprintf(stamp, sizeof(stamp), "%.06lf", time.seconds()); + snprintf( + stamp, sizeof(stamp), "%.06lf", + std::chrono::duration_cast>(time.time_since_epoch()).count()); async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok) .header("Connection", "close") .header("Server", "web_video_server") diff --git a/src/ros_compressed_streamer.cpp b/src/ros_compressed_streamer.cpp index 8c906bf..76c965d 100644 --- a/src/ros_compressed_streamer.cpp +++ b/src/ros_compressed_streamer.cpp @@ -36,7 +36,7 @@ namespace web_video_server RosCompressedStreamer::RosCompressedStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageStreamer(request, connection, node), stream_(std::bind(&rclcpp::Node::now, node), connection) +: ImageStreamer(request, connection, node), stream_(connection) { stream_.sendInitialHeader(); qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); @@ -74,21 +74,22 @@ void RosCompressedStreamer::start() std::bind(&RosCompressedStreamer::imageCallback, this, std::placeholders::_1)); } -void RosCompressedStreamer::restreamFrame(double max_age) +void RosCompressedStreamer::restreamFrame(std::chrono::duration max_age) { if (inactive_ || (last_msg == 0)) { return; } - if (last_frame + rclcpp::Duration::from_seconds(max_age) < node_->now() ) { + if (last_frame_ + max_age < std::chrono::steady_clock::now()) { std::scoped_lock lock(send_mutex_); - sendImage(last_msg, node_->now() ); // don't update last_frame, it may remain an old value. + // don't update last_frame, it may remain an old value. + sendImage(last_msg, std::chrono::steady_clock::now()); } } void RosCompressedStreamer::sendImage( const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, - const rclcpp::Time & time) + const std::chrono::steady_clock::time_point & time) { try { std::string content_type; @@ -130,8 +131,8 @@ void RosCompressedStreamer::imageCallback( { std::scoped_lock lock(send_mutex_); // protects last_msg and last_frame last_msg = msg; - last_frame = rclcpp::Time(msg->header.stamp); - sendImage(last_msg, last_frame); + last_frame_ = std::chrono::steady_clock::now(); + sendImage(last_msg, last_frame_); } diff --git a/src/web_video_server.cpp b/src/web_video_server.cpp index 6366d96..5f548c3 100644 --- a/src/web_video_server.cpp +++ b/src/web_video_server.cpp @@ -110,7 +110,7 @@ WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options) RCLCPP_INFO(get_logger(), "Waiting For connections on %s:%d", address_.c_str(), port_); if (publish_rate_ > 0) { - create_wall_timer(1s / publish_rate_, [this]() {restreamFrames(1.0 / publish_rate_);}); + create_wall_timer(1s / publish_rate_, [this]() {restreamFrames(1s / publish_rate_);}); } cleanup_timer_ = create_wall_timer(500ms, [this]() {cleanup_inactive_streams();}); @@ -123,7 +123,7 @@ WebVideoServer::~WebVideoServer() server_->stop(); } -void WebVideoServer::restreamFrames(double max_age) +void WebVideoServer::restreamFrames(std::chrono::duration max_age) { std::scoped_lock lock(subscriber_mutex_);