From dd313a6f9534eb4b285ab21bf248d97b59c4fd99 Mon Sep 17 00:00:00 2001 From: Yura Sorokin Date: Wed, 5 Jun 2024 01:59:53 +0200 Subject: [PATCH] PS-9244 postfix: binlog_server fails to resume copying binlog file after server restart (#51) https://perconadev.atlassian.net/browse/PS-9244 Completely reworked the logic of the 'binsrv::events::reader_context' class - it is now implemented as a state machine expecting the following sequences of events within the binlogs: (ROTATE(artificial) FORMAT_DESCRIPTION * (ROTATE|STOP)?)+ We now properly handle the case when after improper MySQL server termination the last used binary log may end up not having a STOP or ROTATE event as its last one. 'binlog_streaming.pull_mode' MTR test case now also checks for killing the server while Binlog Server Utility is running in the background in 'pull' mode. --- mtr/binlog_streaming/r/pull_mode.result | 23 ++- mtr/binlog_streaming/t/pull_mode.test | 47 +++-- src/app.cpp | 13 +- src/binsrv/event/event.cpp | 46 +---- src/binsrv/event/reader_context.cpp | 250 ++++++++++++++++-------- src/binsrv/event/reader_context.hpp | 22 ++- src/easymysql/connection.cpp | 2 +- 7 files changed, 254 insertions(+), 149 deletions(-) diff --git a/mtr/binlog_streaming/r/pull_mode.result b/mtr/binlog_streaming/r/pull_mode.result index ef41662..6833573 100644 --- a/mtr/binlog_streaming/r/pull_mode.result +++ b/mtr/binlog_streaming/r/pull_mode.result @@ -24,29 +24,38 @@ INSERT INTO t1 VALUES(DEFAULT); *** Determining the second binary log name. -*** Filling the table with some more data and dropping the table. +*** Filling the table with some more data. +INSERT INTO t1 VALUES(DEFAULT); + +*** Killing the server and restarting it after a pause to test for the +*** missing ROTATE / STOP event at the end of the binary log . +# Kill the server +# restart + +*** Determining the third binary log name. + +*** Filling the table with some more data again and dropping the table. INSERT INTO t1 VALUES(DEFAULT); DROP TABLE t1; -*** FLUSHING the binlog one more time to make sure that the second one +*** FLUSHING the binlog one more time to make sure that the third one *** is no longer open. FLUSH BINARY LOGS; -*** Determining the third binary log name. +*** Determining the fourth binary log name. -*** Waiting till Binlog Server Utility starts processing the third +*** Waiting till Binlog Server Utility starts processing the fourth *** binary log. *** Sending SIGTERM signal to the Binlog Server Utility and waiting for *** the process to terminate -*** Checking that the Binlog Server utility detected an empty storage -include/assert_grep.inc [Binlog storage must be initialized on an empty directory] - *** Comparing server and downloaded versions of the first binlog file *** Comparing server and downloaded versions of the second binlog file +*** Comparing server and downloaded versions of the third binlog file + *** Removing the Binlog Server utility storage directory. *** Removing the Binlog Server utility log file. diff --git a/mtr/binlog_streaming/t/pull_mode.test b/mtr/binlog_streaming/t/pull_mode.test index 39bf11a..cc38650 100644 --- a/mtr/binlog_streaming/t/pull_mode.test +++ b/mtr/binlog_streaming/t/pull_mode.test @@ -60,30 +60,47 @@ INSERT INTO t1 VALUES(DEFAULT); --let $second_binlog = query_get_value(SHOW MASTER STATUS, File, 1) --echo ---echo *** Filling the table with some more data and dropping the table. +--echo *** Filling the table with some more data. +INSERT INTO t1 VALUES(DEFAULT); + +--echo +--echo *** Killing the server and restarting it after a pause to test for the +--echo *** missing ROTATE / STOP event at the end of the binary log . +--source include/kill_mysqld.inc +# Sleeping here deliberately so that the Binlog Server Utility would encounter +# read timeout and would try to reconnect several times. +--sleep 10 +--source include/start_mysqld.inc + +--echo +--echo *** Determining the third binary log name. +--let $third_binlog = query_get_value(SHOW MASTER STATUS, File, 1) + +--echo +--echo *** Filling the table with some more data again and dropping the table. INSERT INTO t1 VALUES(DEFAULT); DROP TABLE t1; --echo ---echo *** FLUSHING the binlog one more time to make sure that the second one +--echo *** FLUSHING the binlog one more time to make sure that the third one --echo *** is no longer open. FLUSH BINARY LOGS; --echo ---echo *** Determining the third binary log name. ---let $third_binlog = query_get_value(SHOW MASTER STATUS, File, 1) +--echo *** Determining the fourth binary log name. +--let $fourth_binlog = query_get_value(SHOW MASTER STATUS, File, 1) --echo ---echo *** Waiting till Binlog Server Utility starts processing the third +--echo *** Waiting till Binlog Server Utility starts processing the fourth --echo *** binary log. # We grep the Binlog Server Utility log file in a loop until we encounter the -# third binary log file name. +# fourth binary log file name. --let $max_number_of_attempts = 60 --let $iteration = 0 while($iteration < $max_number_of_attempts) { --error 0, 1 - --exec grep --silent $third_binlog $binsrv_log_path + --exec grep --silent $fourth_binlog $binsrv_log_path --let $grep_status = $__error if ($grep_status == 0) { @@ -97,7 +114,7 @@ while($iteration < $max_number_of_attempts) } if ($grep_status != 0) { - --die The Binlog Server Utility did not start processing the third binary log. + --die The Binlog Server Utility did not start processing the fourth binary log. } --echo @@ -121,14 +138,6 @@ EOF --remove_file $binsrv_pid_file ---echo ---echo *** Checking that the Binlog Server utility detected an empty storage ---let $assert_text = Binlog storage must be initialized on an empty directory ---let $assert_file = $binsrv_log_path ---let $assert_count = 1 ---let $assert_select = binlog storage initialized on an empty directory ---source include/assert_grep.inc - --echo --echo *** Comparing server and downloaded versions of the first binlog file --let $local_file = $binlog_base_dir/$first_binlog @@ -141,6 +150,12 @@ EOF --let $storage_object = $binsrv_storage_path/$second_binlog --source ../include/diff_with_storage_object.inc +--echo +--echo *** Comparing server and downloaded versions of the third binlog file +--let $local_file = $binlog_base_dir/$third_binlog +--let $storage_object = $binsrv_storage_path/$third_binlog +--source ../include/diff_with_storage_object.inc + # cleaning up --source ../include/tear_down_binsrv_environment.inc diff --git a/src/app.cpp b/src/app.cpp index a192923..9069bc5 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -45,6 +45,7 @@ #include "binsrv/storage.hpp" #include "binsrv/storage_backend_factory.hpp" +#include "binsrv/event//checksum_algorithm_type.hpp" #include "binsrv/event/code_type.hpp" #include "binsrv/event/event.hpp" #include "binsrv/event/flag_type.hpp" @@ -222,6 +223,12 @@ void process_binlog_event(const binsrv::event::event ¤t_event, skip_open_binlog = false; } else { + // in case when the server was not shut down properly, it won't have + // ROTATE or STOP event as the last one in the binlog, so here we + // handle this case by closing the old binlog and opening a new one + if (storage.is_binlog_open()) { + storage.close_binlog(); + } storage.open_binlog(current_rotate_body.get_binlog()); } } @@ -291,7 +298,11 @@ void receive_binlog_events( util::const_byte_span portion; - binsrv::event::reader_context context{}; + // TODO: change this checksum algorithm to the value of the + // @source_binlog_checksum variable that we set in the + // 'connection::switch_to_replication()' + binsrv::event::reader_context context{ + binsrv::event::checksum_algorithm_type::off}; // if binlog is still open, there is no sense to close it and re-open // instead, we will just instruct this loop to process the diff --git a/src/binsrv/event/event.cpp b/src/binsrv/event/event.cpp index a150601..d986a4f 100644 --- a/src/binsrv/event/event.cpp +++ b/src/binsrv/event/event.cpp @@ -27,7 +27,6 @@ #include "binsrv/event/checksum_algorithm_type.hpp" #include "binsrv/event/code_type.hpp" -#include "binsrv/event/flag_type.hpp" #include "binsrv/event/generic_body.hpp" #include "binsrv/event/generic_post_header.hpp" #include "binsrv/event/protocol_traits_fwd.hpp" @@ -59,18 +58,12 @@ event::event(reader_context &context, util::const_byte_span portion) // format_description_events always include event footers with checksums footer_size = footer::size_in_bytes; } else { - if (context.has_fde_processed()) { - // if format_description event has already been encountered, we determine - // whether there is a footer in the event from it - footer_size = (context.get_current_checksum_algorithm() == - checksum_algorithm_type::crc32 - ? footer::size_in_bytes - : 0U); - } else { - // we get in this branch only for the very first artificial rotate event - // and in this case it does not include the footer - footer_size = 0U; - } + // we determine whether there is a footer in the event from the + // reader_context + footer_size = (context.get_current_checksum_algorithm() == + checksum_algorithm_type::crc32 + ? footer::size_in_bytes + : 0U); } const std::size_t event_size = std::size(portion); @@ -80,31 +73,8 @@ event::event(reader_context &context, util::const_byte_span portion) "header"); } std::size_t post_header_size{0U}; - if (context.has_fde_processed()) { - // if format_description event has already been encountered in the stream, - // we take post-header length from it - post_header_size = context.get_current_post_header_length(code); - } else { - // we expect that we can receive only 2 events before there is a - // format_description event we can refer to: rotate (with artificial - // flag) and format description event itself - post_header_size = get_expected_post_header_length(code); - switch (code) { - case code_type::rotate: - if (!common_header_.get_flags().has_element(flag_type::artificial)) { - util::exception_location().raise( - "rotate event without preceding format_description event must have " - "'artificial' flag set"); - } - break; - case code_type::format_description: - break; - default: - util::exception_location().raise( - "this type of event must be preceded by a format_description event"); - } - assert(post_header_size != unspecified_post_header_length); - } + post_header_size = context.get_current_post_header_length(code); + assert(post_header_size != unspecified_post_header_length); const std::size_t group_size = common_header::size_in_bytes + post_header_size + footer_size; diff --git a/src/binsrv/event/reader_context.cpp b/src/binsrv/event/reader_context.cpp index 7ce089d..33892b8 100644 --- a/src/binsrv/event/reader_context.cpp +++ b/src/binsrv/event/reader_context.cpp @@ -30,60 +30,169 @@ namespace binsrv::event { -reader_context::reader_context() - : checksum_algorithm_{checksum_algorithm_type::off} {} +reader_context::reader_context(checksum_algorithm_type checksum_algorithm) + : checksum_algorithm_{checksum_algorithm}, + post_header_lengths_{event::expected_post_header_lengths} {} void reader_context::process_event(const event ¤t_event) { + bool processed{false}; + while (!processed) { + switch (state_) { + case state_type::initial: + processed = process_event_in_initial_state(current_event); + break; + case state_type::rotate_artificial_processed: + processed = + process_event_in_rotate_artificial_processed_state(current_event); + break; + case state_type::format_description_processed: + processed = + process_event_in_format_description_processed_state(current_event); + break; + default: + assert(false); + } + } + // TODO: check if CRC32 checksum from the footer (if present) matches the + // calculated one +} + +[[nodiscard]] bool +reader_context::process_event_in_initial_state(const event ¤t_event) { + assert(state_ == state_type::initial); const auto &common_header{current_event.get_common_header()}; - const auto code{common_header.get_type_code()}; + + // in the "initial" state we expect only artificial rotate events const auto is_artificial{ common_header.get_flags().has_element(flag_type::artificial)}; - const auto is_pseudo{common_header.get_next_event_position_raw() == 0U}; + const auto is_artificial_rotate{ + common_header.get_type_code() == code_type::rotate && is_artificial}; + if (!is_artificial_rotate) { + util::exception_location().raise( + "an artificial rotate event must be the very first event in the " + "initial state"); + } + // artificial rotate events must always have next event position and // timestamp set to 0 + if (common_header.get_timestamp_raw() != 0U) { + util::exception_location().raise( + "non-zero timestamp found in an artificial rotate event"); + } + const auto is_pseudo{common_header.get_next_event_position_raw() == 0U}; + if (!is_pseudo) { + util::exception_location().raise( + "non-zero next event position found in an artificial rotate event"); + } + + // we expect an artificial rotate event to be the very first event in the + // newly-created binlog file + if (position_ != 0U) { + util::exception_location().raise( + "artificial rotate event is not the very first event in a binlog " + "file"); + } + + position_ = static_cast( + current_event.get_post_header().get_position_raw()); + + // transition to the next state + state_ = state_type::rotate_artificial_processed; + return true; +} + +[[nodiscard]] bool +reader_context::process_event_in_rotate_artificial_processed_state( + const event ¤t_event) { + assert(state_ == state_type::rotate_artificial_processed); + const auto &common_header{current_event.get_common_header()}; + + // in the "rotate_artificial_processed" state we expect only format + // description events + if (common_header.get_type_code() != code_type::format_description) { + // TODO: this check should be performed just after the common header is + // parsed to make sure we rely on proper post_header lengths + util::exception_location().raise( + "format description event must follow an artificial rotate event"); + } + + const auto &post_header{ + current_event.get_post_header()}; + + // check if FDE has expected binlog version number + if (post_header.get_binlog_version_raw() != default_binlog_version) { + util::exception_location().raise( + "unexpected binlog version number in format description event"); + } + + // check if FDE has expected common header size + if (post_header.get_common_header_length() != default_common_header_length) { + util::exception_location().raise( + "unexpected common header length in format description event"); + } + + // check if the values from the post_header_lengths array are the same as + // generic_post_header_impl::size_in_bytes for known events + validate_post_header_lengths(post_header.get_post_header_lengths_raw(), + event::expected_post_header_lengths); + + post_header_lengths_ = post_header.get_post_header_lengths_raw(); + + const auto &body{current_event.get_body()}; + checksum_algorithm_ = body.get_checksum_algorithm(); + + // some format description events (non-pseudo ones) must be written to + // the binary log file and advance position when being processed + const auto is_pseudo{common_header.get_next_event_position_raw() == 0U}; + if (!is_pseudo) { + validate_position_and_advance(common_header); + } + // transition to the next state + state_ = state_type::format_description_processed; + return true; +} + +[[nodiscard]] bool +reader_context::process_event_in_format_description_processed_state( + const event ¤t_event) { + assert(state_ == state_type::format_description_processed); + const auto &common_header{current_event.get_common_header()}; + const auto code{common_header.get_type_code()}; + const auto is_artificial{ + common_header.get_flags().has_element(flag_type::artificial)}; + + // early return here with "false" return code so that the while loop + // in the main 'process_event()' method would repeat processing this + // event but from the initial state (which is set here) if (code == code_type::rotate && is_artificial) { - if (common_header.get_timestamp_raw() != 0U) { - util::exception_location().raise( - "non-zero timestamp found in an artificial rotate event"); - } - if (!is_pseudo) { - util::exception_location().raise( - "non-zero next event position found in an artificial rotate event"); - } + position_ = 0U; + state_ = state_type::initial; + return false; + } - // we expect an artificial rotate event to be the very first event in the - // newly-created binlog file - if (position_ != 0U) { - util::exception_location().raise( - "artificial rotate event is not the very first event in a binlog " - "file"); - } + if (code == code_type::format_description) { + // format description event must appear only once within a binlog + util::exception_location().raise( + "encountered second format description event within the same binlog"); + } - position_ = static_cast( - current_event.get_post_header().get_position_raw()); + if (is_artificial) { + util::exception_location().raise( + "unexpected artificial event in the 'format description processed' " + "state"); } - if (!is_artificial && !is_pseudo) { - // every non-artificial event must be preceded by the FDE - // (the exception is FDE itself) - if (code != code_type::format_description && !fde_processed_) { - // TODO: this check should be performed just after the common header is - // parsed to make sure we rely on proper post_header lengths - util::exception_location().raise( - "a non-artificial event encountered before format description event"); - } - // check if common_header.next_event_position matches current position - // plus common_header.event_size - if (position_ + common_header.get_event_size_raw() != - common_header.get_next_event_position_raw()) { - util::exception_location().raise( - "unexpected next event position in the event common header"); - } - // simply advance current position - position_ = common_header.get_next_event_position_raw(); + const auto is_pseudo{common_header.get_next_event_position_raw() == 0U}; + if (is_pseudo) { + util::exception_location().raise( + "unexpected pseudo event in the 'format description processed' state"); } - if (code == code_type::rotate && !is_artificial) { + switch (code) { + case code_type::rotate: + // here we process only real (non-artificial) rotate events because of + // the early return at the beginning of the method + assert(!is_artificial); // position in non-artificial rotate event post header must be equal to // magic_binlog_offset (4) if (current_event.get_post_header().get_position_raw() != @@ -92,64 +201,45 @@ void reader_context::process_event(const event ¤t_event) { "unexpected position in an non-artificial rotate event post " "header"); } - - // normal (non-artificial) ROTATE event is expected to be the last event in - // binlog - so we reset the current position here - position_ = 0U; - } - if (code == code_type::stop) { - // alternatively, the last event in binlog may be not only non-artificial + [[fallthrough]]; + case code_type::stop: + // the last event in binlog could be not only a non-artificial // ROTATE (in case when admin executed FLUSH BINARY LOGS) but STOP (when // the server was shut down) // in this latter case, we also reset the position in order to indicate // the end of the current cycle and expect new ROTATE(artificial) and // FORMAT_DESCRIPTION position_ = 0U; + state_ = state_type::initial; + break; + default: + validate_position_and_advance(common_header); + // not changing the state as we remain in 'format_description_processed' } + return true; +} - // TODO: add some kind of state machine where the expected sequence of events - // is the following - - // (ROTATE(artificial) FORMAT_DESCRIPTION * (ROTATE|STOP))* - if (code == code_type::format_description) { - const auto &post_header{ - current_event.get_post_header()}; - const auto &body{current_event.get_body()}; - - // check if FDE has expected binlog version number - if (post_header.get_binlog_version_raw() != default_binlog_version) { - util::exception_location().raise( - "unexpected binlog version number in format description event"); - } - - // check if FDE has expected common header size - if (post_header.get_common_header_length() != - default_common_header_length) { - util::exception_location().raise( - "unexpected common header length in format description event"); - } - - // check if the values from the post_header_lengths array are the same as - // generic_post_header_impl::size_in_bytes for known events - validate_post_header_lengths(post_header.get_post_header_lengths_raw(), - event::expected_post_header_lengths); - - fde_processed_ = true; - post_header_lengths_ = post_header.get_post_header_lengths_raw(); - checksum_algorithm_ = body.get_checksum_algorithm(); +void reader_context::validate_position_and_advance( + const common_header &common_header) { + assert(common_header.get_next_event_position_raw() != 0U); + // check if common_header.next_event_position matches current position + // plus common_header.event_size + if (position_ + common_header.get_event_size_raw() != + common_header.get_next_event_position_raw()) { + util::exception_location().raise( + "unexpected next event position in the event common header"); } - // TODO: check if CRC32 checksum from the footer (if present) matches the - // calculated one + // simply advance current position + position_ = common_header.get_next_event_position_raw(); } [[nodiscard]] checksum_algorithm_type reader_context::get_current_checksum_algorithm() const noexcept { - assert(has_fde_processed()); return checksum_algorithm_; } [[nodiscard]] std::size_t reader_context::get_current_post_header_length(code_type code) const noexcept { - assert(has_fde_processed()); return get_post_header_length_for_code(post_header_lengths_, code); } diff --git a/src/binsrv/event/reader_context.hpp b/src/binsrv/event/reader_context.hpp index 447a0ef..bc01974 100644 --- a/src/binsrv/event/reader_context.hpp +++ b/src/binsrv/event/reader_context.hpp @@ -19,6 +19,7 @@ #include "binsrv/event/reader_context_fwd.hpp" // IWYU pragma: export #include "binsrv/event/checksum_algorithm_type_fwd.hpp" +#include "binsrv/event/common_header_fwd.hpp" #include "binsrv/event/event_fwd.hpp" #include "binsrv/event/protocol_traits.hpp" @@ -28,11 +29,8 @@ class [[nodiscard]] reader_context { friend class event; public: - reader_context(); + explicit reader_context(checksum_algorithm_type checksum_algorithm); - [[nodiscard]] bool has_fde_processed() const noexcept { - return fde_processed_; - } [[nodiscard]] checksum_algorithm_type get_current_checksum_algorithm() const noexcept; [[nodiscard]] std::size_t @@ -42,13 +40,25 @@ class [[nodiscard]] reader_context { } private: - bool fde_processed_{false}; - // NOLINTNEXTLINE(cppcoreguidelines-use-default-member-init,modernize-use-default-member-init) + // this class implements the logic of the following state machine + // (ROTATE(artificial) FORMAT_DESCRIPTION * (ROTATE|STOP)?)+ + enum class state_type { + initial, + rotate_artificial_processed, + format_description_processed + }; + state_type state_{state_type::initial}; checksum_algorithm_type checksum_algorithm_; post_header_length_container post_header_lengths_{}; std::uint32_t position_{0U}; void process_event(const event ¤t_event); + [[nodiscard]] bool process_event_in_initial_state(const event ¤t_event); + [[nodiscard]] bool process_event_in_rotate_artificial_processed_state( + const event ¤t_event); + [[nodiscard]] bool process_event_in_format_description_processed_state( + const event ¤t_event); + void validate_position_and_advance(const common_header &common_header); }; } // namespace binsrv::event diff --git a/src/easymysql/connection.cpp b/src/easymysql/connection.cpp index 4a6ab7e..5670c09 100644 --- a/src/easymysql/connection.cpp +++ b/src/easymysql/connection.cpp @@ -163,7 +163,7 @@ connection &connection::operator=(connection &&other) noexcept { } // default destructor is OK as 'mysql_impl_' and 'rpl_impl_' will be -// destrojed in the order reverse to how they were declared, e.g. +// destroyed in the order reverse to how they were declared, e.g. // 'rpl_impl_' first, 'mysql_impl_' second connection::~connection() = default;