Skip to content

Commit

Permalink
PS-9244 postfix: binlog_server fails to resume copying binlog file af…
Browse files Browse the repository at this point in the history
…ter server restart (#51)

https://perconadev.atlassian.net/browse/PS-9244

Completely reworked the logic of the 'binsrv::events::reader_context' class - it is
now implemented as a state machine expecting the following sequences of events
within the binlogs:
(ROTATE(artificial) FORMAT_DESCRIPTION <ANY>* (ROTATE|STOP)?)+

We now properly handle the case when after improper MySQL server termination
the last used binary log may end up not having a STOP or ROTATE event as its last
one.

'binlog_streaming.pull_mode' MTR test case now also checks for killing the server
while Binlog Server Utility is running in the background in 'pull' mode.
  • Loading branch information
percona-ysorokin authored Jun 4, 2024
1 parent 713fd54 commit dd313a6
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 149 deletions.
23 changes: 16 additions & 7 deletions mtr/binlog_streaming/r/pull_mode.result
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,38 @@ INSERT INTO t1 VALUES(DEFAULT);

*** Determining the second binary log name.

*** Filling the table with some more data and dropping the table.
*** Filling the table with some more data.
INSERT INTO t1 VALUES(DEFAULT);

*** Killing the server and restarting it after a pause to test for the
*** missing ROTATE / STOP event at the end of the binary log .
# Kill the server
# restart

*** Determining the third binary log name.

*** Filling the table with some more data again and dropping the table.
INSERT INTO t1 VALUES(DEFAULT);
DROP TABLE t1;

*** FLUSHING the binlog one more time to make sure that the second one
*** FLUSHING the binlog one more time to make sure that the third one
*** is no longer open.
FLUSH BINARY LOGS;

*** Determining the third binary log name.
*** Determining the fourth binary log name.

*** Waiting till Binlog Server Utility starts processing the third
*** Waiting till Binlog Server Utility starts processing the fourth
*** binary log.

*** Sending SIGTERM signal to the Binlog Server Utility and waiting for
*** the process to terminate

*** Checking that the Binlog Server utility detected an empty storage
include/assert_grep.inc [Binlog storage must be initialized on an empty directory]

*** Comparing server and downloaded versions of the first binlog file

*** Comparing server and downloaded versions of the second binlog file

*** Comparing server and downloaded versions of the third binlog file

*** Removing the Binlog Server utility storage directory.

*** Removing the Binlog Server utility log file.
Expand Down
47 changes: 31 additions & 16 deletions mtr/binlog_streaming/t/pull_mode.test
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,47 @@ INSERT INTO t1 VALUES(DEFAULT);
--let $second_binlog = query_get_value(SHOW MASTER STATUS, File, 1)

--echo
--echo *** Filling the table with some more data and dropping the table.
--echo *** Filling the table with some more data.
INSERT INTO t1 VALUES(DEFAULT);

--echo
--echo *** Killing the server and restarting it after a pause to test for the
--echo *** missing ROTATE / STOP event at the end of the binary log .
--source include/kill_mysqld.inc
# Sleeping here deliberately so that the Binlog Server Utility would encounter
# read timeout and would try to reconnect several times.
--sleep 10
--source include/start_mysqld.inc

--echo
--echo *** Determining the third binary log name.
--let $third_binlog = query_get_value(SHOW MASTER STATUS, File, 1)

--echo
--echo *** Filling the table with some more data again and dropping the table.
INSERT INTO t1 VALUES(DEFAULT);
DROP TABLE t1;

--echo
--echo *** FLUSHING the binlog one more time to make sure that the second one
--echo *** FLUSHING the binlog one more time to make sure that the third one
--echo *** is no longer open.
FLUSH BINARY LOGS;

--echo
--echo *** Determining the third binary log name.
--let $third_binlog = query_get_value(SHOW MASTER STATUS, File, 1)
--echo *** Determining the fourth binary log name.
--let $fourth_binlog = query_get_value(SHOW MASTER STATUS, File, 1)

--echo
--echo *** Waiting till Binlog Server Utility starts processing the third
--echo *** Waiting till Binlog Server Utility starts processing the fourth
--echo *** binary log.
# We grep the Binlog Server Utility log file in a loop until we encounter the
# third binary log file name.
# fourth binary log file name.
--let $max_number_of_attempts = 60
--let $iteration = 0
while($iteration < $max_number_of_attempts)
{
--error 0, 1
--exec grep --silent $third_binlog $binsrv_log_path
--exec grep --silent $fourth_binlog $binsrv_log_path
--let $grep_status = $__error
if ($grep_status == 0)
{
Expand All @@ -97,7 +114,7 @@ while($iteration < $max_number_of_attempts)
}
if ($grep_status != 0)
{
--die The Binlog Server Utility did not start processing the third binary log.
--die The Binlog Server Utility did not start processing the fourth binary log.
}

--echo
Expand All @@ -121,14 +138,6 @@ EOF

--remove_file $binsrv_pid_file

--echo
--echo *** Checking that the Binlog Server utility detected an empty storage
--let $assert_text = Binlog storage must be initialized on an empty directory
--let $assert_file = $binsrv_log_path
--let $assert_count = 1
--let $assert_select = binlog storage initialized on an empty directory
--source include/assert_grep.inc

--echo
--echo *** Comparing server and downloaded versions of the first binlog file
--let $local_file = $binlog_base_dir/$first_binlog
Expand All @@ -141,6 +150,12 @@ EOF
--let $storage_object = $binsrv_storage_path/$second_binlog
--source ../include/diff_with_storage_object.inc

--echo
--echo *** Comparing server and downloaded versions of the third binlog file
--let $local_file = $binlog_base_dir/$third_binlog
--let $storage_object = $binsrv_storage_path/$third_binlog
--source ../include/diff_with_storage_object.inc

# cleaning up
--source ../include/tear_down_binsrv_environment.inc

Expand Down
13 changes: 12 additions & 1 deletion src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "binsrv/storage.hpp"
#include "binsrv/storage_backend_factory.hpp"

#include "binsrv/event//checksum_algorithm_type.hpp"
#include "binsrv/event/code_type.hpp"
#include "binsrv/event/event.hpp"
#include "binsrv/event/flag_type.hpp"
Expand Down Expand Up @@ -222,6 +223,12 @@ void process_binlog_event(const binsrv::event::event &current_event,

skip_open_binlog = false;
} else {
// in case when the server was not shut down properly, it won't have
// ROTATE or STOP event as the last one in the binlog, so here we
// handle this case by closing the old binlog and opening a new one
if (storage.is_binlog_open()) {
storage.close_binlog();
}
storage.open_binlog(current_rotate_body.get_binlog());
}
}
Expand Down Expand Up @@ -291,7 +298,11 @@ void receive_binlog_events(

util::const_byte_span portion;

binsrv::event::reader_context context{};
// TODO: change this checksum algorithm to the value of the
// @source_binlog_checksum variable that we set in the
// 'connection::switch_to_replication()'
binsrv::event::reader_context context{
binsrv::event::checksum_algorithm_type::off};

// if binlog is still open, there is no sense to close it and re-open
// instead, we will just instruct this loop to process the
Expand Down
46 changes: 8 additions & 38 deletions src/binsrv/event/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#include "binsrv/event/checksum_algorithm_type.hpp"
#include "binsrv/event/code_type.hpp"
#include "binsrv/event/flag_type.hpp"
#include "binsrv/event/generic_body.hpp"
#include "binsrv/event/generic_post_header.hpp"
#include "binsrv/event/protocol_traits_fwd.hpp"
Expand Down Expand Up @@ -59,18 +58,12 @@ event::event(reader_context &context, util::const_byte_span portion)
// format_description_events always include event footers with checksums
footer_size = footer::size_in_bytes;
} else {
if (context.has_fde_processed()) {
// if format_description event has already been encountered, we determine
// whether there is a footer in the event from it
footer_size = (context.get_current_checksum_algorithm() ==
checksum_algorithm_type::crc32
? footer::size_in_bytes
: 0U);
} else {
// we get in this branch only for the very first artificial rotate event
// and in this case it does not include the footer
footer_size = 0U;
}
// we determine whether there is a footer in the event from the
// reader_context
footer_size = (context.get_current_checksum_algorithm() ==
checksum_algorithm_type::crc32
? footer::size_in_bytes
: 0U);
}

const std::size_t event_size = std::size(portion);
Expand All @@ -80,31 +73,8 @@ event::event(reader_context &context, util::const_byte_span portion)
"header");
}
std::size_t post_header_size{0U};
if (context.has_fde_processed()) {
// if format_description event has already been encountered in the stream,
// we take post-header length from it
post_header_size = context.get_current_post_header_length(code);
} else {
// we expect that we can receive only 2 events before there is a
// format_description event we can refer to: rotate (with artificial
// flag) and format description event itself
post_header_size = get_expected_post_header_length(code);
switch (code) {
case code_type::rotate:
if (!common_header_.get_flags().has_element(flag_type::artificial)) {
util::exception_location().raise<std::logic_error>(
"rotate event without preceding format_description event must have "
"'artificial' flag set");
}
break;
case code_type::format_description:
break;
default:
util::exception_location().raise<std::logic_error>(
"this type of event must be preceded by a format_description event");
}
assert(post_header_size != unspecified_post_header_length);
}
post_header_size = context.get_current_post_header_length(code);
assert(post_header_size != unspecified_post_header_length);

const std::size_t group_size =
common_header::size_in_bytes + post_header_size + footer_size;
Expand Down
Loading

0 comments on commit dd313a6

Please sign in to comment.