From 094e2ad9d3fd29dde2f0644e0e2ecb04aa07a67a Mon Sep 17 00:00:00 2001 From: Pablo Hoch Date: Wed, 30 Aug 2023 16:34:56 +0200 Subject: [PATCH] ris: fix rabbitmq stream resume position on reconnect --- .pkg | 2 +- .pkg.lock | 4 +- .../motis/ris/ribasis/ribasis_receiver.h | 4 +- modules/ris/src/ribasis/ribasis_receiver.cc | 6 +- modules/ris/src/ris.cc | 60 +++++++++++-------- 5 files changed, 44 insertions(+), 32 deletions(-) diff --git a/.pkg b/.pkg index b50b3102f..d1b1fa4fa 100644 --- a/.pkg +++ b/.pkg @@ -93,7 +93,7 @@ [rabbitmq-c] url=git@github.com:motis-project/rabbitmq-c.git branch=master - commit=3be31133abf01103c46e90f4aa6f7a32ac7a53c4 + commit=76c17cc0c50579a20d725f4d212c8ee3f5947044 [nigiri] url=git@github.com:motis-project/nigiri.git branch=master diff --git a/.pkg.lock b/.pkg.lock index a2e27cb17..9cbf1d745 100644 --- a/.pkg.lock +++ b/.pkg.lock @@ -1,4 +1,4 @@ -16994618710495999182 +17326477996587457244 cista a6218e9cd9da9e1c7a70c1188e2faf9e082b7f1c zlib fe8e13ffca867612951bc6baf114e5ac8b00f305 boost be5235eb2258d2ec19e32546ab767a62311d9b46 @@ -37,7 +37,7 @@ cpptoml 2133029ec819e8398e96fa679993b269f21ff9f2 rapidjson 9ece673b648b19d0f1995b82d402c9a6fc0eb277 ppr 14b5a9da9a4249f7becaf2e0e577ddbc9856d2db pugixml 60175e80e2f5e97e027ac78f7e14c5acc009ce50 -rabbitmq-c 3be31133abf01103c46e90f4aa6f7a32ac7a53c4 +rabbitmq-c 76c17cc0c50579a20d725f4d212c8ee3f5947044 zstd bce26304a57dd504ae7ae51f384bf9292d7e3acb tar 3a08b6575eb6a04e6b3d0977e6da3b61a91d62f2 clipper 904f0e6644c7f01c176443613be8f7788d59c658 diff --git a/modules/ris/include/motis/ris/ribasis/ribasis_receiver.h b/modules/ris/include/motis/ris/ribasis/ribasis_receiver.h index d687bf817..4547a60dc 100644 --- a/modules/ris/include/motis/ris/ribasis/ribasis_receiver.h +++ b/modules/ris/include/motis/ris/ribasis/ribasis_receiver.h @@ -4,6 +4,7 @@ #include #include "rabbitmq/amqp.hpp" +#include "rabbitmq/stream.hpp" #include "motis/core/common/unixtime.h" @@ -19,7 +20,8 @@ struct receiver { std::function&&)>; receiver(rabbitmq_config config, source_status& status, - msg_handler_fn msg_handler); + msg_handler_fn msg_handler, + amqp::get_stream_options_fn_t get_stream_options); void stop(); diff --git a/modules/ris/src/ribasis/ribasis_receiver.cc b/modules/ris/src/ribasis/ribasis_receiver.cc index 873dd2aee..4c7c2b9e4 100644 --- a/modules/ris/src/ribasis/ribasis_receiver.cc +++ b/modules/ris/src/ribasis/ribasis_receiver.cc @@ -15,10 +15,12 @@ std::string get_queue_id(amqp::login const& login) { } receiver::receiver(rabbitmq_config config, source_status& status, - msg_handler_fn msg_handler) + msg_handler_fn msg_handler, + amqp::get_stream_options_fn_t get_stream_options) : config_{std::move(config)}, connection_{&config_.login_, - [this](std::string const& log_msg) { log(log_msg); }}, + [this](std::string const& log_msg) { log(log_msg); }, + std::move(get_stream_options)}, last_update_{now()}, msg_handler_{std::move(msg_handler)}, queue_id_{get_queue_id(config_.login_)}, diff --git a/modules/ris/src/ris.cc b/modules/ris/src/ris.cc index b850f4dc5..c5c2190b1 100644 --- a/modules/ris/src/ris.cc +++ b/modules/ris/src/ris.cc @@ -277,32 +277,6 @@ struct ris::impl { return; } - if (config.resume_stream_) { - auto const queue_id = ribasis::get_queue_id(config.login_); - auto const stored_stream_offset = get_stream_offset(queue_id); - auto const stored_stream_timestamp = get_stream_timestamp(queue_id); - if (stored_stream_offset) { - auto resume = true; - if (config.max_resume_age_ && stored_stream_timestamp) { - if (*stored_stream_timestamp < - unixtime_duration_ago(config.max_resume_age_)) { - LOG(info) << prefix - << ": last stream timestamp is too old, resuming at " - << config.login_.stream_offset_; - resume = false; - } - } - if (resume) { - LOG(info) << prefix << ": resuming at stored stream offset " - << *stored_stream_offset; - config.login_.numeric_stream_offset_ = *stored_stream_offset + 1; - } - } else { - LOG(info) << prefix << ": no stored stream offset found, resuming at " - << config.login_.stream_offset_; - } - } - ribasis_receivers_.emplace_back(std::make_unique( config, get_ribasis_status(prefix), [this, d, sched](ribasis::receiver& rec, @@ -347,6 +321,40 @@ struct ris::impl { ctx::accesses_t{ctx::access_request{ to_res_id(::motis::module::global_res_id::RIS_DATA), ctx::access_t::WRITE}}); + }, + [this, &config, prefix]() { + auto stream_opts = amqp::stream_options{}; + if (config.resume_stream_) { + auto const queue_id = ribasis::get_queue_id(config.login_); + auto const stored_stream_offset = get_stream_offset(queue_id); + auto const stored_stream_timestamp = + get_stream_timestamp(queue_id); + stream_opts.stream_offset_ = config.login_.stream_offset_; + if (stored_stream_offset) { + auto resume = true; + if (config.max_resume_age_ && stored_stream_timestamp) { + if (*stored_stream_timestamp < + unixtime_duration_ago(config.max_resume_age_)) { + LOG(info) + << prefix + << ": last stream timestamp is too old, resuming at " + << config.login_.stream_offset_; + resume = false; + } + } + if (resume) { + LOG(info) << prefix << ": resuming at stored stream offset " + << *stored_stream_offset; + stream_opts.numeric_stream_offset_ = + *stored_stream_offset + 1; + } + } else { + LOG(info) << prefix + << ": no stored stream offset found, resuming at " + << config.login_.stream_offset_; + } + } + return stream_opts; })); }); }