Skip to content

Commit

Permalink
ris: fix rabbitmq stream offset on reconnect (#390)
Browse files Browse the repository at this point in the history
  • Loading branch information
pablohoch authored Sep 4, 2023
1 parent 9162fc3 commit 6278db1
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .pkg
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
[rabbitmq-c]
[email protected]:motis-project/rabbitmq-c.git
branch=master
commit=3be31133abf01103c46e90f4aa6f7a32ac7a53c4
commit=76c17cc0c50579a20d725f4d212c8ee3f5947044
[nigiri]
[email protected]:motis-project/nigiri.git
branch=master
Expand Down
4 changes: 2 additions & 2 deletions .pkg.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
16994618710495999182
17326477996587457244
cista a6218e9cd9da9e1c7a70c1188e2faf9e082b7f1c
zlib fe8e13ffca867612951bc6baf114e5ac8b00f305
boost be5235eb2258d2ec19e32546ab767a62311d9b46
Expand Down Expand Up @@ -37,7 +37,7 @@ cpptoml 2133029ec819e8398e96fa679993b269f21ff9f2
rapidjson 9ece673b648b19d0f1995b82d402c9a6fc0eb277
ppr 14b5a9da9a4249f7becaf2e0e577ddbc9856d2db
pugixml 60175e80e2f5e97e027ac78f7e14c5acc009ce50
rabbitmq-c 3be31133abf01103c46e90f4aa6f7a32ac7a53c4
rabbitmq-c 76c17cc0c50579a20d725f4d212c8ee3f5947044
zstd bce26304a57dd504ae7ae51f384bf9292d7e3acb
tar 3a08b6575eb6a04e6b3d0977e6da3b61a91d62f2
clipper 904f0e6644c7f01c176443613be8f7788d59c658
Expand Down
4 changes: 3 additions & 1 deletion modules/ris/include/motis/ris/ribasis/ribasis_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <vector>

#include "rabbitmq/amqp.hpp"
#include "rabbitmq/stream.hpp"

#include "motis/core/common/unixtime.h"

Expand All @@ -19,7 +20,8 @@ struct receiver {
std::function<void(receiver&, std::vector<amqp::msg>&&)>;

receiver(rabbitmq_config config, source_status& status,
msg_handler_fn msg_handler);
msg_handler_fn msg_handler,
amqp::get_stream_options_fn_t get_stream_options);

void stop();

Expand Down
6 changes: 4 additions & 2 deletions modules/ris/src/ribasis/ribasis_receiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ std::string get_queue_id(amqp::login const& login) {
}

receiver::receiver(rabbitmq_config config, source_status& status,
msg_handler_fn msg_handler)
msg_handler_fn msg_handler,
amqp::get_stream_options_fn_t get_stream_options)
: config_{std::move(config)},
connection_{&config_.login_,
[this](std::string const& log_msg) { log(log_msg); }},
[this](std::string const& log_msg) { log(log_msg); },
std::move(get_stream_options)},
last_update_{now()},
msg_handler_{std::move(msg_handler)},
queue_id_{get_queue_id(config_.login_)},
Expand Down
60 changes: 34 additions & 26 deletions modules/ris/src/ris.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,32 +277,6 @@ struct ris::impl {
return;
}

if (config.resume_stream_) {
auto const queue_id = ribasis::get_queue_id(config.login_);
auto const stored_stream_offset = get_stream_offset(queue_id);
auto const stored_stream_timestamp = get_stream_timestamp(queue_id);
if (stored_stream_offset) {
auto resume = true;
if (config.max_resume_age_ && stored_stream_timestamp) {
if (*stored_stream_timestamp <
unixtime_duration_ago(config.max_resume_age_)) {
LOG(info) << prefix
<< ": last stream timestamp is too old, resuming at "
<< config.login_.stream_offset_;
resume = false;
}
}
if (resume) {
LOG(info) << prefix << ": resuming at stored stream offset "
<< *stored_stream_offset;
config.login_.numeric_stream_offset_ = *stored_stream_offset + 1;
}
} else {
LOG(info) << prefix << ": no stored stream offset found, resuming at "
<< config.login_.stream_offset_;
}
}

ribasis_receivers_.emplace_back(std::make_unique<ribasis::receiver>(
config, get_ribasis_status(prefix),
[this, d, sched](ribasis::receiver& rec,
Expand Down Expand Up @@ -347,6 +321,40 @@ struct ris::impl {
ctx::accesses_t{ctx::access_request{
to_res_id(::motis::module::global_res_id::RIS_DATA),
ctx::access_t::WRITE}});
},
[this, &config, prefix]() {
auto stream_opts = amqp::stream_options{};
if (config.resume_stream_) {
auto const queue_id = ribasis::get_queue_id(config.login_);
auto const stored_stream_offset = get_stream_offset(queue_id);
auto const stored_stream_timestamp =
get_stream_timestamp(queue_id);
stream_opts.stream_offset_ = config.login_.stream_offset_;
if (stored_stream_offset) {
auto resume = true;
if (config.max_resume_age_ && stored_stream_timestamp) {
if (*stored_stream_timestamp <
unixtime_duration_ago(config.max_resume_age_)) {
LOG(info)
<< prefix
<< ": last stream timestamp is too old, resuming at "
<< config.login_.stream_offset_;
resume = false;
}
}
if (resume) {
LOG(info) << prefix << ": resuming at stored stream offset "
<< *stored_stream_offset;
stream_opts.numeric_stream_offset_ =
*stored_stream_offset + 1;
}
} else {
LOG(info) << prefix
<< ": no stored stream offset found, resuming at "
<< config.login_.stream_offset_;
}
}
return stream_opts;
}));
});
}
Expand Down

0 comments on commit 6278db1

Please sign in to comment.