From 07b8637598800699fcd142db120870e76179662f Mon Sep 17 00:00:00 2001 From: Yura Sorokin Date: Thu, 9 May 2024 02:39:08 +0200 Subject: [PATCH] Implemented AWS S3 storage backend (#44) Current implementation accepts storage URIs in the following form: 's3://[:@][.]/'. In case '' is not specified explicitly, it will be auto-detected from the location of the '' bucket. As AWS S3 does not have a direct way to append data to an existin object, we use the following strategy for resuming stream operations. * Upon opening a stream (via 'do_open_stream()' method) for an object that already exists on S3 (in case we run the utility on a storage that has already been initialized), we download the content of this object into a temporary file. * All writes (data appends) requested by the 'do_write_data_to_stream()' method will be performed to this temporary file. * When 'do_close_stream() method is called, we upload the content of this temporary file back to S3 (overwriting the content of the existing object). Re-designed 'binsrv::s3_storage_backend' class. Introduced internal 'aws_context' class that follows the 'pimpl' idiom, so that the main class include file 'binsrv/s3_storage_backend.hpp' does not depemnd on any '' headers. 'binsrv::filesystem_backend_storage' class now explicitly specifies required combinations of the 'std::ios_base::[in | out | binary | trunc | app]' flags in full for all internal 'std::ifstream' / 'std::ofstream' / 'std::fstream' objects. 'binsrv::storage' class destructor now tries to call 'backend_->close_stream();' in order to flush stream state to the storage backend in case of normal / exceptional shutdown. 'binsrv::basic_storage_backend' class extended with additional method 'is_stream_open()' indication whether the object is in a state between 'open_stream()' and 'close_stream()' calls. 'open_stream()' method in the 'binsrv::basic_storage_backend' class now accepts one additional parameter indicating an intent to either create or append a storage stream explicitly. Similarly to 'easymysql::core_error' added 'binsrv::s3_error' exception class with its own 's3_category()' error category ('std::error_category'). Similarly to 'easymysql::raise_core_error_from_connection()' added 'raise_s3_error_from_outcome()' helper function which throws an exception with error info extracted from 'Aws::S3Crt::S3CrtError' (an error-path alternative of almost any 'S3Crt' call outcome). 'easymysql::raise_core_error_from_connection()' helper function extended with additional 'user_message' parameter. Main application now prints 'successfully shut down' to the log at the end of execution. 'binsrv' MTR test case extended with additional logic that allows to use both 'file://' and 's3://' as backend storage providers (the choice depends on whether 'MTR_BINSRV_AWS_ACCESS_KEY_ID' / 'MTR_BINSRV_AWS_SECRET_ACCESS_KEY' / 'MTR_BINSRV_AWS_S3_BUCKET' environment variables are set or not). Added extra precautions for accidentally leaking AWS credentials - we now temporarily disable MySQL general query log to make sure that 'AWS_ACCESS_KEY_ID' / 'MTR_BINSRV_AWS_SECRET_ACCESS_KEY' will not appear in the recorded SQL queries. Added 'diff_with_storage_object.inc' MTR include file that can compare a local file with an object from backend storage (either 'file' or 's3'). Added more instructions on how to make 'binsrv' MTR test case use AWS S3 as a storage backend in 'mtr/README'. Added '.clang-format' file to 'mtr' directory to exclude MTR test cases and include files from being processed by 'clang-format'. --- CMakeLists.txt | 5 + mtr/.clang-format | 2 + mtr/README | 12 + .../include/diff_with_storage_object.inc | 39 ++ mtr/binlog_streaming/r/binsrv.result | 29 +- mtr/binlog_streaming/t/binsrv.test | 104 +++- src/app.cpp | 1 + src/binsrv/basic_storage_backend.cpp | 15 +- src/binsrv/basic_storage_backend.hpp | 9 +- src/binsrv/basic_storage_backend_fwd.hpp | 2 + src/binsrv/cout_logger.hpp | 2 +- src/binsrv/file_logger.hpp | 2 +- src/binsrv/filesystem_storage_backend.cpp | 52 +- src/binsrv/filesystem_storage_backend.hpp | 8 +- src/binsrv/s3_error.cpp | 45 ++ src/binsrv/s3_error.hpp | 40 ++ src/binsrv/s3_error_helpers_private.cpp | 68 +++ src/binsrv/s3_error_helpers_private.hpp | 32 ++ src/binsrv/s3_storage_backend.cpp | 519 ++++++++++++++++-- src/binsrv/s3_storage_backend.hpp | 46 +- src/binsrv/storage.cpp | 17 +- src/binsrv/storage.hpp | 2 +- src/easymysql/binlog.cpp | 4 +- src/easymysql/connection.cpp | 5 +- src/easymysql/core_error_helpers_private.cpp | 14 +- src/easymysql/core_error_helpers_private.hpp | 4 +- 26 files changed, 944 insertions(+), 134 deletions(-) create mode 100644 mtr/.clang-format create mode 100644 mtr/binlog_streaming/include/diff_with_storage_object.inc create mode 100644 src/binsrv/s3_error.cpp create mode 100644 src/binsrv/s3_error.hpp create mode 100644 src/binsrv/s3_error_helpers_private.cpp create mode 100644 src/binsrv/s3_error_helpers_private.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 266333a..fbc4059 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -185,6 +185,11 @@ set(source_files src/binsrv/main_config.hpp src/binsrv/main_config.cpp + src/binsrv/s3_error_helpers_private.hpp + src/binsrv/s3_error_helpers_private.cpp + src/binsrv/s3_error.hpp + src/binsrv/s3_error.cpp + src/binsrv/s3_storage_backend.hpp src/binsrv/s3_storage_backend.cpp diff --git a/mtr/.clang-format b/mtr/.clang-format new file mode 100644 index 0000000..47a38a9 --- /dev/null +++ b/mtr/.clang-format @@ -0,0 +1,2 @@ +DisableFormat: true +SortIncludes: Never diff --git a/mtr/README b/mtr/README index 12a42c6..c80bfa1 100644 --- a/mtr/README +++ b/mtr/README @@ -7,3 +7,15 @@ following: 2. Set 'BINSRV' enviroment variable pointing to the 'binlog_server' binary and run MTR. BINSRV=/binlog_server ./mysql-test/mtr --suite=binlog_streaming +3. In order to run the tests using AWS S3 as a storage backend also define the + following environemnt variables + * MTR_BINSRV_AWS_ACCESS_KEY_ID - AWS access key ID + * MTR_BINSRV_AWS_SECRET_ACCESS_KEY - AWS secret access key + * MTR_BINSRV_AWS_S3_BUCKET - AWS S3 bucket name + * MTR_BINSRV_AWS_S3_REGION - AWS S3 region (optional) + BINSRV=/binlog_server \ + MTR_BINSRV_AWS_ACCESS_KEY_ID=... \ + MTR_BINSRV_AWS_SECRET_ACCESS_KEY=... \ + MTR_BINSRV_AWS_S3_BUCKET=my-bucket \ + MTR_BINSRV_AWS_S3_REGION=eu-central-1 \ + ./mysql-test/mtr --suite=binlog_streaming diff --git a/mtr/binlog_streaming/include/diff_with_storage_object.inc b/mtr/binlog_streaming/include/diff_with_storage_object.inc new file mode 100644 index 0000000..0b06825 --- /dev/null +++ b/mtr/binlog_streaming/include/diff_with_storage_object.inc @@ -0,0 +1,39 @@ +# +# Compares a file on a local filesystem with an object from the backend storage. +# +# Usage: +# --let $storage_backend = file +# --let $local_file = $MYSQL_TMP_DIR/first +# --let $storage_object = $MYSQL_TMP_DIR/second +# --source diff_with_storage_object.inc +# +# or +# --let $aws_cli = AWS_ACCESS_KEY_ID=... AWS_SECRET_ACCESS_KEY=... aws +# --let $storage_backend = s3 +# --let $local_file = $MYSQL_TMP_DIR/first +# --let $aws_s3_bucket = my-bucket +# --let $storage_object = /vault/second +# --source diff_with_storage_object.inc +# +# $storage_backend - determines stortage backend type (either 'file' or 'fs') +# $aws_cli - path to AWS command line interface (cli) tools with AWS_ACCESS_KEY_ID / +# AWS_SECRET_ACCESS_KEY environment variables set appropriately +# (needed only if $storage_backend is 's3') +# $local_file - a path to the first file on a local filesystem +# $aws_s3_bucket - AWS S3 bucket name (needed only if $storage_backend is 's3') +# $storage_object - if $storage_backend is 'file', a path to the second file on +# a local filesystem; if $storage_backend is 's3', a path to +# the second object on AWS S3 +# + +if ($storage_backend == file) +{ + --diff_files $local_file $storage_object +} +if ($storage_backend == s3) +{ + --let $downloaded_file_path = $MYSQL_TMP_DIR/diff_with_storage_object.downloaded + --exec $aws_cli s3 cp s3://$aws_s3_bucket$storage_object $downloaded_file_path > /dev/null + --diff_files $local_file $downloaded_file_path + --remove_file $downloaded_file_path +} diff --git a/mtr/binlog_streaming/r/binsrv.result b/mtr/binlog_streaming/r/binsrv.result index 0433b7c..378bd88 100644 --- a/mtr/binlog_streaming/r/binsrv.result +++ b/mtr/binlog_streaming/r/binsrv.result @@ -20,28 +20,6 @@ INSERT INTO t1 VALUES(DEFAULT); *** Generating a configuration file in JSON format for the Binlog *** Server utility. -SET @storage_path = ''; -SET @storage_uri = CONCAT('file://', @storage_path); -SET @log_path = ''; -SET @delimiter_pos = INSTR(USER(), '@'); -SET @connection_user = SUBSTRING(USER(), 1, @delimiter_pos - 1); -SET @connection_host = SUBSTRING(USER(), @delimiter_pos + 1); -SET @connection_host = IF(@connection_host = 'localhost', '127.0.0.1', @connection_host); -SET @binsrv_config_json = JSON_OBJECT( -'logger', JSON_OBJECT( -'level', 'trace', -'file', @log_path -), -'connection', JSON_OBJECT( -'host', @connection_host, -'port', @@global.port, -'user', @connection_user, -'password', '' - ), -'storage', JSON_OBJECT( -'uri', @storage_uri -) -); *** Determining binlog file directory from the server. @@ -52,6 +30,9 @@ SET @binsrv_config_json = JSON_OBJECT( *** from the server to the directory (second *** binlog is still open / in use). +*** Checking that the Binlog Server utility detected an empty storage +include/assert_grep.inc [Binlog storage must be initialized on an empty directory] + *** Comparing server and downloaded versions of the first binlog file. *** Patching the server version of the second binlog file to clear the @@ -71,6 +52,10 @@ FLUSH BINARY LOGS; *** binlog is no longer open / in use). Here we should also continue *** streaming binlog events from the last saved position. +*** Checking that the Binlog Server utility detected a previously +*** initialized storage +include/assert_grep.inc [Binlog storage must be initialized on a non-empty directory] + *** Comparing server and downloaded versions of the first binlog file *** one more time. diff --git a/mtr/binlog_streaming/t/binsrv.test b/mtr/binlog_streaming/t/binsrv.test index 89cb70f..2441b36 100644 --- a/mtr/binlog_streaming/t/binsrv.test +++ b/mtr/binlog_streaming/t/binsrv.test @@ -1,3 +1,11 @@ +# The following environment variables must be defined to use AWS S3 as a +# storage backend: +# - $MTR_BINSRV_AWS_ACCESS_KEY_ID +# - $MTR_BINSRV_AWS_SECRET_ACCESS_KEY +# - $MTR_BINSRV_AWS_S3_BUCKET +# - $MTR_BINSRV_AWS_S3_REGION (optional) + + # make sure that $BINSRV environment variable is set to the absolute path # of the Binlog Server utility before running this test if (!$BINSRV) { @@ -36,16 +44,51 @@ FLUSH BINARY LOGS; --echo *** Filling the table with some more data. INSERT INTO t1 VALUES(DEFAULT); +--let $storage_backend = file +if ($MTR_BINSRV_AWS_ACCESS_KEY_ID != '') +{ + if ($MTR_BINSRV_AWS_SECRET_ACCESS_KEY != '') + { + if ($MTR_BINSRV_AWS_S3_BUCKET != '') + { + --let $storage_backend = s3 + --let $aws_cli = AWS_ACCESS_KEY_ID=$MTR_BINSRV_AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY=$MTR_BINSRV_AWS_SECRET_ACCESS_KEY aws + if ($MTR_BINSRV_AWS_S3_REGION != '') + { + --let $aws_cli = $aws_cli --region $MTR_BINSRV_AWS_S3_REGION + } + } + } +} + --echo --echo *** Generating a configuration file in JSON format for the Binlog --echo *** Server utility. ---let $binsrv_storage_path = $MYSQL_TMP_DIR/storage ---replace_result $binsrv_storage_path -eval SET @storage_path = '$binsrv_storage_path'; -SET @storage_uri = CONCAT('file://', @storage_path); + +# temporarily disabling MySQL general query log so that AWS credentials +# will not appear in plain in recorded SQL queries +--disable_query_log +SET @old_sql_log_off = @@sql_log_off; +SET sql_log_off = ON; + +if ($storage_backend == file) +{ + --let $binsrv_storage_path = $MYSQL_TMP_DIR/storage + eval SET @storage_uri = CONCAT('file://', '$binsrv_storage_path'); +} +if ($storage_backend == s3) +{ + --let $qualified_bucket = $MTR_BINSRV_AWS_S3_BUCKET + if ($MTR_BINSRV_AWS_S3_REGION) + { + --let $qualified_bucket = $qualified_bucket.$MTR_BINSRV_AWS_S3_REGION + } + --let $binsrv_storage_path = `SELECT CONCAT('/mtr-', UUID())` + eval SET @storage_uri = CONCAT('s3://', '$MTR_BINSRV_AWS_ACCESS_KEY_ID', ':', '$MTR_BINSRV_AWS_SECRET_ACCESS_KEY', '@', '$qualified_bucket', '$binsrv_storage_path'); + --let $aws_s3_bucket = $MTR_BINSRV_AWS_S3_BUCKET +} --let $binsrv_log_path = $MYSQL_TMP_DIR/binsrv_utility.log ---replace_result $binsrv_log_path eval SET @log_path = '$binsrv_log_path'; SET @delimiter_pos = INSTR(USER(), '@'); @@ -74,6 +117,9 @@ eval SET @binsrv_config_json = JSON_OBJECT( --let $write_to_file = $binsrv_config_file_path --source include/write_var_to_file.inc +SET sql_log_off = @old_sql_log_off; +--enable_query_log + --echo --echo *** Determining binlog file directory from the server. --disable_query_log @@ -89,7 +135,10 @@ if ($have_windows) { --echo --echo *** Creating a temporary directory for storing --echo *** binlog files downloaded via the Binlog Server utility. ---mkdir $binsrv_storage_path +if ($storage_backend == file) +{ + --mkdir $binsrv_storage_path +} --echo --echo *** Executing the Binlog Server utility to download all binlog data @@ -97,13 +146,23 @@ if ($have_windows) { --echo *** binlog is still open / in use). --exec $BINSRV $binsrv_config_file_path > /dev/null +--echo +--echo *** Checking that the Binlog Server utility detected an empty storage +--let $assert_text = Binlog storage must be initialized on an empty directory +--let $assert_file = $binsrv_log_path +--let $assert_count = 1 +--let $assert_select = binlog storage initialized on an empty directory +--source include/assert_grep.inc + # At this point we have 2 binlog files $first_binlog (already closed/rotedted # by the server) and $second_binlog (currently open). # The former can be compared as is. --echo --echo *** Comparing server and downloaded versions of the first binlog file. ---diff_files $binlog_base_dir/$first_binlog $binsrv_storage_path/$first_binlog +--let $local_file = $binlog_base_dir/$first_binlog +--let $storage_object = $binsrv_storage_path/$first_binlog +--source ../include/diff_with_storage_object.inc # Because the latter from the server is currently open for writing, it has one # additional bit (LOG_EVENT_BINLOG_IN_USE_F = 0x1) set in the flags field of the @@ -141,7 +200,10 @@ EOF --echo --echo *** Comparing server and downloaded versions of the second binlog file. ---diff_files $PATCHED_BINLOG_FILE $binsrv_storage_path/$second_binlog +--let $local_file = $PATCHED_BINLOG_FILE +--let $storage_object = $binsrv_storage_path/$second_binlog +--source ../include/diff_with_storage_object.inc + --remove_file $PATCHED_BINLOG_FILE --echo @@ -160,19 +222,39 @@ FLUSH BINARY LOGS; --echo *** streaming binlog events from the last saved position. --exec $BINSRV $binsrv_config_file_path > /dev/null +--echo +--echo *** Checking that the Binlog Server utility detected a previously +--echo *** initialized storage +--let $assert_text = Binlog storage must be initialized on a non-empty directory +--let $assert_file = $binsrv_log_path +--let $assert_count = 1 +--let $assert_select = binlog storage initialized at +--source include/assert_grep.inc + --echo --echo *** Comparing server and downloaded versions of the first binlog file --echo *** one more time. ---diff_files $binlog_base_dir/$first_binlog $binsrv_storage_path/$first_binlog +--let $local_file = $binlog_base_dir/$first_binlog +--let $storage_object = $binsrv_storage_path/$first_binlog +--source ../include/diff_with_storage_object.inc --echo --echo *** Comparing server and downloaded versions of the second binlog file --echo *** (without patching) one more time. ---diff_files $binlog_base_dir/$second_binlog $binsrv_storage_path/$second_binlog +--let $local_file = $binlog_base_dir/$second_binlog +--let $storage_object = $binsrv_storage_path/$second_binlog +--source ../include/diff_with_storage_object.inc --echo --echo *** Removing the Binlog Server utility storage directory. ---force-rmdir $binsrv_storage_path +if ($storage_backend == file) +{ + --force-rmdir $binsrv_storage_path +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3 rm s3://$aws_s3_bucket$binsrv_storage_path/ --recursive > /dev/null +} --echo --echo *** Removing the Binlog Server utility log file. diff --git a/src/app.cpp b/src/app.cpp index 6a2d2a3..17391e0 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -279,6 +279,7 @@ int main(int argc, char *argv[]) { receive_binlog_events(*logger, binlog, storage); + logger->log(binsrv::log_severity::info, "successfully shut down"); exit_code = EXIT_SUCCESS; } catch (...) { handle_std_exception(logger); diff --git a/src/binsrv/basic_storage_backend.cpp b/src/binsrv/basic_storage_backend.cpp index e260060..35d2cb6 100644 --- a/src/binsrv/basic_storage_backend.cpp +++ b/src/binsrv/basic_storage_backend.cpp @@ -39,18 +39,19 @@ void basic_storage_backend::put_object(std::string_view name, return do_put_object(name, content); } -void basic_storage_backend::open_stream(std::string_view name) { - if (stream_opened_) { +void basic_storage_backend::open_stream(std::string_view name, + storage_backend_open_stream_mode mode) { + if (stream_open_) { util::exception_location().raise( "cannot open a new stream as the previous one has not been closed"); } - do_open_stream(name); - stream_opened_ = true; + do_open_stream(name, mode); + stream_open_ = true; } void basic_storage_backend::write_data_to_stream(util::const_byte_span data) { - if (!stream_opened_) { + if (!stream_open_) { util::exception_location().raise( "cannot write to the stream as it has not been opened"); } @@ -58,12 +59,12 @@ void basic_storage_backend::write_data_to_stream(util::const_byte_span data) { } void basic_storage_backend::close_stream() { - if (!stream_opened_) { + if (!stream_open_) { util::exception_location().raise( "cannot close the stream as it has not been opened"); } do_close_stream(); - stream_opened_ = false; + stream_open_ = false; } [[nodiscard]] std::string basic_storage_backend::get_description() const { diff --git a/src/binsrv/basic_storage_backend.hpp b/src/binsrv/basic_storage_backend.hpp index 34ff827..5d224ed 100644 --- a/src/binsrv/basic_storage_backend.hpp +++ b/src/binsrv/basic_storage_backend.hpp @@ -39,21 +39,24 @@ class basic_storage_backend { [[nodiscard]] std::string get_object(std::string_view name); void put_object(std::string_view name, util::const_byte_span content); - void open_stream(std::string_view name); + [[nodiscard]] bool is_stream_open() const noexcept { return stream_open_; } + void open_stream(std::string_view name, + storage_backend_open_stream_mode mode); void write_data_to_stream(util::const_byte_span data); void close_stream(); [[nodiscard]] std::string get_description() const; private: - bool stream_opened_{false}; + bool stream_open_{false}; [[nodiscard]] virtual storage_object_name_container do_list_objects() = 0; [[nodiscard]] virtual std::string do_get_object(std::string_view name) = 0; virtual void do_put_object(std::string_view name, util::const_byte_span content) = 0; - virtual void do_open_stream(std::string_view name) = 0; + virtual void do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) = 0; virtual void do_write_data_to_stream(util::const_byte_span data) = 0; virtual void do_close_stream() = 0; diff --git a/src/binsrv/basic_storage_backend_fwd.hpp b/src/binsrv/basic_storage_backend_fwd.hpp index b71971b..806a439 100644 --- a/src/binsrv/basic_storage_backend_fwd.hpp +++ b/src/binsrv/basic_storage_backend_fwd.hpp @@ -23,6 +23,8 @@ namespace binsrv { +enum class storage_backend_open_stream_mode { create, append }; + class basic_storage_backend; using basic_storage_backend_ptr = std::unique_ptr; diff --git a/src/binsrv/cout_logger.hpp b/src/binsrv/cout_logger.hpp index 3083843..ce22eeb 100644 --- a/src/binsrv/cout_logger.hpp +++ b/src/binsrv/cout_logger.hpp @@ -22,7 +22,7 @@ namespace binsrv { -class [[nodiscard]] cout_logger : public basic_logger { +class [[nodiscard]] cout_logger final : public basic_logger { public: explicit cout_logger(log_severity min_level) : basic_logger{min_level} {} diff --git a/src/binsrv/file_logger.hpp b/src/binsrv/file_logger.hpp index 8b57336..88cad26 100644 --- a/src/binsrv/file_logger.hpp +++ b/src/binsrv/file_logger.hpp @@ -25,7 +25,7 @@ namespace binsrv { -class [[nodiscard]] file_logger : public basic_logger { +class [[nodiscard]] file_logger final : public basic_logger { public: file_logger(log_severity min_level, std::string_view file_name); diff --git a/src/binsrv/filesystem_storage_backend.cpp b/src/binsrv/filesystem_storage_backend.cpp index 292a2c3..b1c9661 100644 --- a/src/binsrv/filesystem_storage_backend.cpp +++ b/src/binsrv/filesystem_storage_backend.cpp @@ -15,7 +15,7 @@ #include "binsrv/filesystem_storage_backend.hpp" -#include +#include #include #include #include @@ -99,25 +99,24 @@ filesystem_storage_backend::do_list_objects() { [[nodiscard]] std::string filesystem_storage_backend::do_get_object(std::string_view name) { - const auto index_path{get_object_path(name)}; - - static constexpr std::size_t max_file_size{1048576U}; + const auto object_path{get_object_path(name)}; // opening in binary mode - std::ifstream ifs{index_path, std::ios_base::binary}; - if (!ifs.is_open()) { + std::ifstream object_ifs{object_path, + std::ios_base::in | std::ios_base::binary}; + if (!object_ifs.is_open()) { util::exception_location().raise( "cannot open undellying object file"); } - auto file_size = std::filesystem::file_size(index_path); - if (file_size > max_file_size) { + auto file_size = std::filesystem::file_size(object_path); + if (file_size > max_memory_object_size) { util::exception_location().raise( - "undellying object file is too large"); + "undellying object file is too large to be loaded in memory"); } std::string file_content(file_size, 'x'); - if (!ifs.read(std::data(file_content), - static_cast(file_size))) { + if (!object_ifs.read(std::data(file_content), + static_cast(file_size))) { util::exception_location().raise( "cannot read undellying object file content"); } @@ -126,26 +125,33 @@ filesystem_storage_backend::do_get_object(std::string_view name) { void filesystem_storage_backend::do_put_object(std::string_view name, util::const_byte_span content) { - const auto index_path = get_object_path(name); + const auto object_path = get_object_path(name); // opening in binary mode with truncating - std::ofstream index_ofs{index_path, std::ios_base::trunc}; - if (!index_ofs.is_open()) { + std::ofstream object_ofs{object_path, std::ios_base::out | + std::ios_base::binary | + std::ios_base::trunc}; + if (!object_ofs.is_open()) { util::exception_location().raise( "cannot open undellying object file for writing"); } const auto content_sv = util::as_string_view(content); - if (!index_ofs.write(std::data(content_sv), - static_cast(std::size(content_sv)))) { + if (!object_ofs.write(std::data(content_sv), + static_cast(std::size(content_sv)))) { util::exception_location().raise( "cannot write date to undellying object file"); } } -void filesystem_storage_backend::do_open_stream(std::string_view name) { - std::filesystem::path current_file_path{root_path_}; - current_file_path /= name; +void filesystem_storage_backend::do_open_stream( + std::string_view name, storage_backend_open_stream_mode mode) { + assert(!ofs_.is_open()); + const std::filesystem::path current_file_path{get_object_path(name)}; - ofs_.open(current_file_path, std::ios_base::binary | std::ios_base::app); + auto open_mode{std::ios_base::out | std::ios_base::binary | + (mode == storage_backend_open_stream_mode::create + ? std::ios_base::trunc + : std::ios_base::app)}; + ofs_.open(current_file_path, open_mode); if (!ofs_.is_open()) { util::exception_location().raise( "cannot open underlying file for the stream"); @@ -154,6 +160,7 @@ void filesystem_storage_backend::do_open_stream(std::string_view name) { void filesystem_storage_backend::do_write_data_to_stream( util::const_byte_span data) { + assert(ofs_.is_open()); const auto data_sv = util::as_string_view(data); if (!ofs_.write(std::data(data_sv), static_cast(std::size(data_sv)))) { @@ -164,7 +171,10 @@ void filesystem_storage_backend::do_write_data_to_stream( // use fsync() system call here } -void filesystem_storage_backend::do_close_stream() { ofs_.close(); } +void filesystem_storage_backend::do_close_stream() { + assert(ofs_.is_open()); + ofs_.close(); +} [[nodiscard]] std::string filesystem_storage_backend::do_get_description() const { diff --git a/src/binsrv/filesystem_storage_backend.hpp b/src/binsrv/filesystem_storage_backend.hpp index a44b13d..1b540d0 100644 --- a/src/binsrv/filesystem_storage_backend.hpp +++ b/src/binsrv/filesystem_storage_backend.hpp @@ -26,8 +26,11 @@ namespace binsrv { -class [[nodiscard]] filesystem_storage_backend : public basic_storage_backend { +class [[nodiscard]] filesystem_storage_backend final + : public basic_storage_backend { public: + static constexpr std::size_t max_memory_object_size{1048576U}; + static constexpr std::string_view uri_schema{"file"}; explicit filesystem_storage_backend(const boost::urls::url_view_base &uri); @@ -46,7 +49,8 @@ class [[nodiscard]] filesystem_storage_backend : public basic_storage_backend { void do_put_object(std::string_view name, util::const_byte_span content) override; - void do_open_stream(std::string_view name) override; + void do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) override; void do_write_data_to_stream(util::const_byte_span data) override; void do_close_stream() override; diff --git a/src/binsrv/s3_error.cpp b/src/binsrv/s3_error.cpp new file mode 100644 index 0000000..e021fe5 --- /dev/null +++ b/src/binsrv/s3_error.cpp @@ -0,0 +1,45 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/s3_error.hpp" + +#include +#include + +namespace binsrv { + +const std::error_category &s3_category() noexcept { + class [[nodiscard]] category_impl : public std::error_category { + public: + [[nodiscard]] const char *name() const noexcept override { + return "aws_s3_crt"; + } + [[nodiscard]] std::string message(int code) const override { + return "AWS_S3_CRT_" + std::to_string(code); + } + }; + + static const category_impl instance; + return instance; +} + +s3_error::s3_error(int native_error_code) + : std::system_error{make_s3_error_code(native_error_code)} {} +s3_error::s3_error(int native_error_code, const std::string &what) + : std::system_error{make_s3_error_code(native_error_code), what} {} +s3_error::s3_error(int native_error_code, const char *what) + : std::system_error{make_s3_error_code(native_error_code), what} {} + +} // namespace binsrv diff --git a/src/binsrv/s3_error.hpp b/src/binsrv/s3_error.hpp new file mode 100644 index 0000000..62f7187 --- /dev/null +++ b/src/binsrv/s3_error.hpp @@ -0,0 +1,40 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_S3_ERROR_HPP +#define BINSRV_S3_ERROR_HPP + +#include +#include + +namespace binsrv { + +[[nodiscard]] const std::error_category &s3_category() noexcept; + +[[nodiscard]] inline std::error_code +make_s3_error_code(int native_error_code) noexcept { + return {native_error_code, s3_category()}; +} + +class [[nodiscard]] s3_error : public std::system_error { +public: + explicit s3_error(int native_error_code); + s3_error(int native_error_code, const std::string &what); + s3_error(int native_error_code, const char *what); +}; + +} // namespace binsrv + +#endif // BINSRV_S3_ERROR_HPP diff --git a/src/binsrv/s3_error_helpers_private.cpp b/src/binsrv/s3_error_helpers_private.cpp new file mode 100644 index 0000000..c9bea5d --- /dev/null +++ b/src/binsrv/s3_error_helpers_private.cpp @@ -0,0 +1,68 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/s3_error_helpers_private.hpp" + +#include +#include +#include +#include + +#include + +#include "binsrv/s3_error.hpp" + +#include "util/exception_location_helpers.hpp" + +namespace binsrv { + +[[noreturn]] void +raise_s3_error_from_outcome(std::string_view user_message, + const Aws::S3Crt::S3CrtError &sdk_error, + std::source_location location) { + std::string message{}; + if (!user_message.empty()) { + message += user_message; + message += ": "; + } + + message += '<'; + message += sdk_error.GetExceptionName(); + message += "> - "; + message += sdk_error.GetMessage(); + message += " (IP "; + message += sdk_error.GetRemoteHostIpAddress(); + message += ", RequestID "; + message += sdk_error.GetRequestId(); + message += ", ResponseCode "; + // TODO: in c++23 change to std::to_underlying() + auto http_status_code = sdk_error.GetResponseCode(); + message += std::to_string( + static_cast>( + http_status_code)); + message += ')'; + + // default value std::source_location::current() for the 'location' + // parameter is specified in the declaration of this function + // and passed to the util::exception_location's constructor + // instead of calling util::exception_location's default constructor + // because otherwise the location will always point to this + // particular line on this particular file regardless of the actual + // location from where this function is called + util::exception_location(location).raise( + static_cast(sdk_error.GetErrorType()), message); +} + +} // namespace binsrv diff --git a/src/binsrv/s3_error_helpers_private.hpp b/src/binsrv/s3_error_helpers_private.hpp new file mode 100644 index 0000000..3870a98 --- /dev/null +++ b/src/binsrv/s3_error_helpers_private.hpp @@ -0,0 +1,32 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_S3_ERROR_HELPERS_PRIVATE_HPP +#define BINSRV_S3_ERROR_HELPERS_PRIVATE_HPP + +#include +#include + +#include + +namespace binsrv { + +[[noreturn]] void raise_s3_error_from_outcome( + std::string_view user_message, const Aws::S3Crt::S3CrtError &sdk_error, + std::source_location location = std::source_location::current()); + +} // namespace binsrv + +#endif // BINSRV_S3_ERROR_HELPERS_PRIVATE_HPP diff --git a/src/binsrv/s3_storage_backend.cpp b/src/binsrv/s3_storage_backend.cpp index 00a6c4d..9b91f6a 100644 --- a/src/binsrv/s3_storage_backend.cpp +++ b/src/binsrv/s3_storage_backend.cpp @@ -15,46 +15,393 @@ #include "binsrv/s3_storage_backend.hpp" +#include #include #include +#include +#include +#include +#include #include +#include #include #include #include +#include +#include #include #include #include +#include + #include +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + // TODO: remove this include when switching to clang-18 where transitive // IWYU 'export' pragmas are handled properly #include "binsrv/basic_storage_backend_fwd.hpp" +#include "binsrv/s3_error_helpers_private.hpp" #include "util/byte_span.hpp" #include "util/exception_location_helpers.hpp" -#include "util/impl_helpers.hpp" + +namespace { + +class aws_context_base { +public: + aws_context_base() { Aws::InitAPI(options_); } + aws_context_base(const aws_context_base &) = delete; + aws_context_base &operator=(const aws_context_base &) = delete; + aws_context_base(aws_context_base &&) = delete; + aws_context_base &operator=(aws_context_base &&) = delete; + ~aws_context_base() { Aws::ShutdownAPI(options_); } + + [[nodiscard]] const Aws::SDKOptions &get_options() const noexcept { + return options_; + } + +private: + Aws::SDKOptions options_; +}; + +} // namespace namespace binsrv { -using options_deimpl = util::deimpl; +struct simple_aws_credentials { + std::string access_key_id; + std::string secret_access_key; +}; + +struct qualified_object_path { + std::string bucket; + std::filesystem::path object_path; +}; + +class s3_storage_backend::aws_context : private aws_context_base { +public: + enum class construction_alternative { bucket, region }; + using stream_factory_type = std::function; + using stream_handler_type = std::function; + + aws_context(const simple_aws_credentials &credentials, + construction_alternative alternative, const std::string ¶m); + + aws_context(const aws_context &) = delete; + aws_context &operator=(const aws_context &) = delete; + aws_context(aws_context &&) = delete; + aws_context &operator=(aws_context &&) = delete; + ~aws_context() = default; + + using aws_context_base::get_options; + [[nodiscard]] bool has_credentials() const noexcept { + return !credentials_.IsEmpty(); + } + + [[nodiscard]] const std::string &get_region() const noexcept { + return configuration_.region; + } + + [[nodiscard]] std::string get_bucket_region(const std::string &bucket) const; + + [[nodiscard]] std::string + get_object_into_string(const qualified_object_path &source) const; + + void + get_object_into_file(const qualified_object_path &source, + const std::filesystem::path &content_file_path) const; + + void put_object_from_stream(const qualified_object_path &dest, + std::iostream &content_stream) const; + + void put_object_from_span(const qualified_object_path &dest, + util::const_byte_span content) const; + + [[nodiscard]] storage_object_name_container + list_objects(const qualified_object_path &prefix); + +private: + using iostream_ptr = std::shared_ptr; + // TODO: do not store secret_access_key in plain + Aws::Auth::AWSCredentials credentials_; + Aws::S3Crt::ClientConfiguration configuration_; + using s3_crt_client_ptr = std::unique_ptr; + s3_crt_client_ptr client_; + + void get_object_internal(const qualified_object_path &source, + const stream_factory_type &stream_factory, + const stream_handler_type &stream_handler) const; +}; + +s3_storage_backend::aws_context::aws_context( + const simple_aws_credentials &credentials, + construction_alternative alternative, const std::string ¶m) + : aws_context_base{}, + credentials_{credentials.access_key_id, credentials.secret_access_key}, + configuration_{} { + // if the provided construction_alternative is 'region', treat param as + // AWS S3 region and initialize S3 client with it + // otherwise (if the construction_alternative is 'bucket'), treat param as + // AWS S3 bucket name, leave the 'region' field in the configuration class + // in its default state ("us-east-1") and try to detect AWS S3 region from + // from the bucket location + if (alternative == construction_alternative::region) { + configuration_.region = param; + } + client_ = + std::make_unique(credentials_, configuration_); + if (alternative == construction_alternative::region) { + return; + } + + configuration_.region = get_bucket_region(param); + // reset to nullptr first to make sure we do not have two clients + // simultaneously + client_.reset(); + client_ = + std::make_unique(credentials_, configuration_); +} + +[[nodiscard]] std::string s3_storage_backend::aws_context::get_bucket_region( + const std::string &bucket) const { + Aws::S3Crt::Model::GetBucketLocationRequest bucket_location_request{}; + bucket_location_request.SetBucket(bucket); + const auto bucket_location_outcome{ + client_->GetBucketLocation(bucket_location_request)}; + if (!bucket_location_outcome.IsSuccess()) { + raise_s3_error_from_outcome("cannot identify bucket location", + bucket_location_outcome.GetError()); + } + const auto model_region = + bucket_location_outcome.GetResult().GetLocationConstraint(); + return Aws::S3Crt::Model::BucketLocationConstraintMapper:: + GetNameForBucketLocationConstraint(model_region); +} + +[[nodiscard]] std::string +s3_storage_backend::aws_context::get_object_into_string( + const qualified_object_path &source) const { + std::string content; + auto stream_handler{[&content](std::size_t content_length, + std::iostream &content_stream) { + // TODO: check object length in advance before calling GetObject + // (with HeadObject, for instance) + if (content_length > max_memory_object_size) { + util::exception_location().raise( + "S3 object is too large to be loaded in memory"); + } + + content.resize(content_length); + if (!content_stream.read(std::data(content), + static_cast(content_length))) { + util::exception_location().raise( + "cannot read S3 object content into a string"); + } + assert(content_stream.gcount() == + static_cast(content_length)); + }}; + get_object_internal(source, {}, stream_handler); + + return content; +} + +void s3_storage_backend::aws_context::get_object_into_file( + const qualified_object_path &source, + const std::filesystem::path &content_file_path) const { + auto stream_factory{[&content_file_path]() -> std::iostream * { + return Aws::New( + "GetObjectStreamFactoryAllocationTag", content_file_path, + std::ios_base::in | std::ios_base::out | std::ios_base::binary | + std::ios_base::trunc); + }}; + std::size_t response_content_length{}; + auto stream_handler{ + [&response_content_length](std::size_t content_length, + std::iostream &content_stream) { + content_stream.seekg(0, std::ios_base::end); + + const auto end_position{ + static_cast(content_stream.tellg())}; + if (!std::in_range(end_position) || + static_cast(end_position) != content_length) { + util::exception_location().raise( + "cannot read S3 object content into a file"); + } + response_content_length = content_length; + }}; + + get_object_internal(source, stream_factory, stream_handler); + assert(std::filesystem::file_size(content_file_path) == + response_content_length); +} + +void s3_storage_backend::aws_context::put_object_from_stream( + const qualified_object_path &dest, std::iostream &content_stream) const { + Aws::S3Crt::Model::PutObjectRequest put_object_request; + put_object_request.SetBucket(dest.bucket); + put_object_request.SetKey(dest.object_path.generic_string()); + + // making an shared pointer with noop deleter using aliasing constructor + const iostream_ptr wrapped_content_stream{iostream_ptr{}, &content_stream}; -void s3_storage_backend::options_deleter::operator()(void *ptr) const noexcept { - auto *casted_ptr{static_cast(ptr)}; - Aws::ShutdownAPI(*casted_ptr); - // deleting via std::default_delete to avoid - // cppcoreguidelines-owning-memory warnings - using delete_helper = std::default_delete; - delete_helper{}(casted_ptr); + put_object_request.SetBody(wrapped_content_stream); + + const auto put_object_outcome{client_->PutObject(put_object_request)}; + + if (!put_object_outcome.IsSuccess()) { + raise_s3_error_from_outcome("cannot put object into S3 bucket", + put_object_outcome.GetError()); + } +} + +void s3_storage_backend::aws_context::put_object_from_span( + const qualified_object_path &dest, util::const_byte_span content) const { + std::string materialized_content{util::as_string_view(content)}; + std::stringstream content_stream{std::move(materialized_content), + std::ios_base::in | std::ios_base::out | + std::ios_base::binary}; + put_object_from_stream(dest, content_stream); +} + +[[nodiscard]] storage_object_name_container +s3_storage_backend::aws_context::list_objects( + const qualified_object_path &prefix) { + storage_object_name_container result; + + Aws::S3Crt::Model::ListObjectsV2Request list_objects_request; + list_objects_request.SetBucket(prefix.bucket); + + // for the list operation request the prefix must not start with "/"" + + // moreover, in order to exclude false positives in situations when + // both "/foo" and "/foobar" directories are present and we need to list + // the content of the "/foo" directory, the prefix should include a + // trailing slash "foo/" + + // for the "/" prefix path we should set the prefix to an empty string / + // not set the prefix at all + + // relative path of a default-constructed object / "" / "/" is an empty + // path + auto prefix_path{prefix.object_path.relative_path()}; + + // appending an empty path to an existing path will just add a trailing + // separator, e.g. "foo" -> "foo/" (for en empty path this operation has + // no effect) + prefix_path /= std::filesystem::path{}; + + const auto prefix_str{prefix_path.generic_string()}; + + if (!prefix_str.empty()) { + list_objects_request.SetPrefix(prefix_str); + } + + const auto list_objects_outcome{client_->ListObjectsV2(list_objects_request)}; + if (!list_objects_outcome.IsSuccess()) { + raise_s3_error_from_outcome("cannot list objects in the specified bucket", + list_objects_outcome.GetError()); + } + const auto &list_objects_result = list_objects_outcome.GetResult(); + // TODO: implement receiving the rest of the list + if (list_objects_result.GetIsTruncated()) { + util::exception_location().raise( + "too many objects in the specified bucket"); + } + + auto model_key_count = list_objects_result.GetKeyCount(); + if (!std::in_range(model_key_count)) { + util::exception_location().raise( + "invalid key count in the list objects result"); + } + auto key_count{static_cast(model_key_count)}; + + const auto &model_objects{list_objects_result.GetContents()}; + if (key_count != std::size(model_objects)) { + util::exception_location().raise( + "key count does not match the number of objects in the list objects " + "result"); + } + result.reserve(key_count); + + for (const auto &model_object : model_objects) { + // if the prefix is set, the list of objects in the response will include + // the prefix itself (as a directory) with zero size - it neeeds to be + // skipped + + // moreover, we need to remove the prefix itself from the object paths + std::string_view key{model_object.GetKey()}; + if (!prefix_str.empty()) { + if (!key.starts_with(prefix_str)) { + util::exception_location().raise( + "encountered an object with unexpected prefix"); + } + key.remove_prefix(prefix_str.size()); + } + if (!key.empty()) { + result.emplace(key, model_object.GetSize()); + } + } + + return result; +} + +void s3_storage_backend::aws_context::get_object_internal( + const qualified_object_path &source, + const stream_factory_type &stream_factory, + const stream_handler_type &stream_handler) const { + Aws::S3Crt::Model::GetObjectRequest get_object_request; + if (stream_factory) { + get_object_request.SetResponseStreamFactory(stream_factory); + } + get_object_request.SetBucket(source.bucket); + get_object_request.SetKey(source.object_path.generic_string()); + + const auto get_object_outcome{client_->GetObject(get_object_request)}; + + if (!get_object_outcome.IsSuccess()) { + raise_s3_error_from_outcome("cannot get object from S3 bucket", + get_object_outcome.GetError()); + } + const auto &get_object_result{get_object_outcome.GetResult()}; + auto &content_stream{get_object_result.GetBody()}; + + const auto model_content_length{get_object_result.GetContentLength()}; + if (!std::in_range(model_content_length)) { + util::exception_location().raise( + "invalid S3 object content length"); + } + + const auto content_length{static_cast(model_content_length)}; + + if (stream_handler) { + stream_handler(content_length, content_stream); + } } s3_storage_backend::s3_storage_backend(const boost::urls::url_view_base &uri) - : access_key_id_{}, secret_access_key_{}, root_path_{}, - options_{new Aws::SDKOptions} { - Aws::InitAPI(*options_deimpl::get(options_)); + : bucket_{}, root_path_{}, current_name_{}, uuid_generator_{}, + current_tmp_file_path_{}, tmp_fstream_{}, impl_{} { + // TODO: take into account S3 limits (like 5GB single file upload) - // "s3://:@/" for AWS S3 + // "s3://[:@][.]/" + // for AWS S3 if (uri.scheme_id() != boost::urls::scheme::unknown || uri.scheme() != uri_schema) { util::exception_location().raise( @@ -64,24 +411,41 @@ s3_storage_backend::s3_storage_backend(const boost::urls::url_view_base &uri) util::exception_location().raise( "s3 URI must have host"); } - if (uri.host().find('.') != std::string::npos) { - util::exception_location().raise( - "s3 URI host must be a single bucket name"); - } + + static constexpr char host_delimiter{'.'}; bucket_ = uri.host(); + std::string region{}; + const auto host_fnd{bucket_.find(host_delimiter)}; + if (host_fnd != std::string::npos) { + region.assign(bucket_, host_fnd + 1); + bucket_.resize(host_fnd); + if (bucket_.empty()) { + util::exception_location().raise( + "s3 URI bucket name must not be empty"); + } + if (region.empty()) { + util::exception_location().raise( + "s3 URI region must not be empty"); + } + if (region.find(host_delimiter) != std::string::npos) { + util::exception_location().raise( + "s3 URI region must not be a qualified name"); + } + } if (uri.has_port()) { util::exception_location().raise( "s3 URI must not have port"); } + simple_aws_credentials credentials; if (uri.has_userinfo()) { if (!uri.has_password()) { util::exception_location().raise( "s3 URI must have either both user and password or none"); } - access_key_id_ = uri.user(); - secret_access_key_ = uri.password(); + credentials.access_key_id = uri.user(); + credentials.secret_access_key = uri.password(); } if (uri.has_query()) { util::exception_location().raise( @@ -93,48 +457,129 @@ s3_storage_backend::s3_storage_backend(const boost::urls::url_view_base &uri) } root_path_ = uri.path(); + if (!region.empty()) { + impl_ = std::make_unique( + credentials, aws_context::construction_alternative::region, region); + } else { + impl_ = std::make_unique( + credentials, aws_context::construction_alternative::bucket, bucket_); + } +} + +s3_storage_backend::~s3_storage_backend() { + if (!tmp_fstream_.is_open()) { + return; + } + // bugprone-empty-catch should not be that strict in destructors + try { + close_stream_internal(); + } catch (...) { // NOLINT(bugprone-empty-catch) + } } [[nodiscard]] storage_object_name_container s3_storage_backend::do_list_objects() { - storage_object_name_container result; - return result; + return impl_->list_objects({.bucket = bucket_, .object_path = root_path_}); } [[nodiscard]] std::string -s3_storage_backend::do_get_object(std::string_view /*name*/) { - static constexpr std::size_t file_size{8}; - std::string file_content(file_size, 'x'); - return file_content; +s3_storage_backend::do_get_object(std::string_view name) { + return impl_->get_object_into_string( + {.bucket = bucket_, .object_path = get_object_path(name)}); } -void s3_storage_backend::do_put_object(std::string_view /*name*/, - util::const_byte_span /*content*/) {} +void s3_storage_backend::do_put_object(std::string_view name, + util::const_byte_span content) { + impl_->put_object_from_span( + {.bucket = bucket_, .object_path = get_object_path(name)}, content); +} -void s3_storage_backend::do_open_stream(std::string_view /*name*/) {} +void s3_storage_backend::do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) { + assert(!tmp_fstream_.is_open()); + current_name_ = name; + current_tmp_file_path_ = generate_tmp_file_path(); -void s3_storage_backend::do_write_data_to_stream( - util::const_byte_span /*data*/) {} + if (mode == storage_backend_open_stream_mode::append) { + impl_->get_object_into_file( + {.bucket = bucket_, .object_path = get_object_path(current_name_)}, + current_tmp_file_path_); + } -void s3_storage_backend::do_close_stream() {} + tmp_fstream_.open(current_tmp_file_path_, + std::ios_base::in | std::ios_base::out | + std::ios_base::binary | std::ios_base::app); + if (!tmp_fstream_.is_open()) { + util::exception_location().raise( + "cannot open temporary file for S3 object body stream"); + } +} + +void s3_storage_backend::do_write_data_to_stream(util::const_byte_span data) { + assert(tmp_fstream_.is_open()); + const auto data_sv = util::as_string_view(data); + if (!tmp_fstream_.write(std::data(data_sv), + static_cast(std::size(data_sv)))) { + util::exception_location().raise( + "cannot write data to the underlying stream file"); + } +} + +void s3_storage_backend::do_close_stream() { + assert(tmp_fstream_.is_open()); + close_stream_internal(); + current_tmp_file_path_.clear(); + current_name_.clear(); +} [[nodiscard]] std::string s3_storage_backend::do_get_description() const { std::string res{"AWS S3 (SDK "}; - const auto &casted_options = *options_deimpl::get(options_); - res += std::to_string(casted_options.sdkVersion.major); + const auto &options = impl_->get_options(); + res += std::to_string(options.sdkVersion.major); res += '.'; - res += std::to_string(casted_options.sdkVersion.minor); + res += std::to_string(options.sdkVersion.minor); res += '.'; - res += std::to_string(casted_options.sdkVersion.patch); + res += std::to_string(options.sdkVersion.patch); res += ") - "; - res += "bucket: "; + res += "region: "; + res += impl_->get_region(); + res += ", bucket: "; res += bucket_; res += ", path: "; res += root_path_.generic_string(); res += ", credentials: "; - res += (has_credentials() ? "***hidden***" : "none"); + res += (impl_->has_credentials() ? "***hidden***" : "none"); return res; } + +[[nodiscard]] std::filesystem::path +s3_storage_backend::get_object_path(std::string_view name) const { + auto result{root_path_}; + result /= name; + return result; +} + +[[nodiscard]] std::filesystem::path +s3_storage_backend::generate_tmp_file_path() { + // TODO: change this temp_directory_path() to a configuration parameter + auto result{std::filesystem::temp_directory_path()}; + result /= boost::uuids::to_string(uuid_generator_()); + return result; +} + +void s3_storage_backend::close_stream_internal() { + // S3 object may already exist, it is OK to overwrite it here + impl_->put_object_from_stream( + {.bucket = bucket_, .object_path = get_object_path(current_name_)}, + tmp_fstream_); + + tmp_fstream_.close(); + // we allow std::filesystem::remove() here to fail - worst case scenario + // we will have a temorary file not removed + std::error_code remove_ec; + std::filesystem::remove(current_tmp_file_path_, remove_ec); +} + } // namespace binsrv diff --git a/src/binsrv/s3_storage_backend.hpp b/src/binsrv/s3_storage_backend.hpp index 8e2fc61..3e8585a 100644 --- a/src/binsrv/s3_storage_backend.hpp +++ b/src/binsrv/s3_storage_backend.hpp @@ -17,48 +17,50 @@ #define BINSRV_S3_STORAGE_BACKEND_HPP #include +#include +#include #include #include +#include + #include "binsrv/basic_storage_backend.hpp" // IWYU pragma: export namespace binsrv { -class [[nodiscard]] s3_storage_backend : public basic_storage_backend { +class [[nodiscard]] s3_storage_backend final : public basic_storage_backend { public: + static constexpr std::size_t max_memory_object_size{1048576U}; + static constexpr std::string_view uri_schema{"s3"}; explicit s3_storage_backend(const boost::urls::url_view_base &uri); + s3_storage_backend(const s3_storage_backend &) = delete; + s3_storage_backend &operator=(const s3_storage_backend &) = delete; + s3_storage_backend(s3_storage_backend &&) = delete; + s3_storage_backend &operator=(s3_storage_backend &&) = delete; + ~s3_storage_backend() override; [[nodiscard]] const std::string &get_bucket() const noexcept { return bucket_; } - [[nodiscard]] const std::string &get_access_key_id() const noexcept { - return access_key_id_; - } - [[nodiscard]] const std::string &get_secret_access_key() const noexcept { - return secret_access_key_; - } - [[nodiscard]] bool has_credentials() const noexcept { - return !access_key_id_.empty(); - } [[nodiscard]] const std::filesystem::path &get_root_path() const noexcept { return root_path_; } private: - std::string access_key_id_; - // TODO: do not store secret_access_key in plain - std::string secret_access_key_; std::string bucket_; std::filesystem::path root_path_; - struct options_deleter { - void operator()(void *ptr) const noexcept; - }; - using options_ptr = std::unique_ptr; - options_ptr options_; + std::string current_name_; + boost::uuids::random_generator uuid_generator_; + std::filesystem::path current_tmp_file_path_; + std::fstream tmp_fstream_; + + class aws_context; + using aws_context_ptr = std::unique_ptr; + aws_context_ptr impl_; [[nodiscard]] storage_object_name_container do_list_objects() override; @@ -66,11 +68,17 @@ class [[nodiscard]] s3_storage_backend : public basic_storage_backend { void do_put_object(std::string_view name, util::const_byte_span content) override; - void do_open_stream(std::string_view name) override; + void do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) override; void do_write_data_to_stream(util::const_byte_span data) override; void do_close_stream() override; [[nodiscard]] std::string do_get_description() const override; + + [[nodiscard]] std::filesystem::path + get_object_path(std::string_view name) const; + [[nodiscard]] std::filesystem::path generate_tmp_file_path(); + void close_stream_internal(); }; } // namespace binsrv diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 90018fe..a45f97e 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -56,6 +56,17 @@ storage::storage(basic_storage_backend_ptr backend) } } +storage::~storage() { + if (!backend_->is_stream_open()) { + return; + } + // bugprone-empty-catch should not be that strict in destructors + try { + backend_->close_stream(); + } catch (...) { // NOLINT(bugprone-empty-catch) + } +} + [[nodiscard]] bool storage::check_binlog_name(std::string_view binlog_name) noexcept { // TODO: parse binlog name into "base name" and "rotation number" @@ -73,9 +84,11 @@ void storage::open_binlog(std::string_view binlog_name) { "cannot create a binlog with invalid name"); } - backend_->open_stream(binlog_name); + const auto mode{position_ == 0ULL ? storage_backend_open_stream_mode::create + : storage_backend_open_stream_mode::append}; + backend_->open_stream(binlog_name, mode); - if (position_ == 0ULL) { + if (mode == storage_backend_open_stream_mode::create) { // writing the magic binlog footprint only if this is a newly // created file backend_->write_data_to_stream(event::magic_binlog_payload); diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index ef8c3a4..545570e 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -42,7 +42,7 @@ class [[nodiscard]] storage { storage &operator=(storage &&) = delete; // desctuctor is explicitly defined as default here to complete the rule of 5 - ~storage() = default; + ~storage(); [[nodiscard]] std::string_view get_binlog_name() const noexcept { return binlog_names_.empty() ? std::string_view{} : binlog_names_.back(); diff --git a/src/easymysql/binlog.cpp b/src/easymysql/binlog.cpp index bf63d1b..73231f5 100644 --- a/src/easymysql/binlog.cpp +++ b/src/easymysql/binlog.cpp @@ -70,7 +70,7 @@ binlog::binlog(connection &conn, std::uint32_t server_id, auto *casted_conn_impl = connection_deimpl::get(conn.impl_); if (mysql_binlog_open(casted_conn_impl, binlog_deimpl::get(impl_)) != 0) { - raise_core_error_from_connection(conn); + raise_core_error_from_connection("cannot open binary log", conn); } } @@ -89,7 +89,7 @@ util::const_byte_span binlog::fetch() { auto *casted_conn_impl = connection_deimpl::get(conn_->impl_); auto *casted_rpl_impl = binlog_deimpl::get(impl_); if (mysql_binlog_fetch(casted_conn_impl, casted_rpl_impl) != 0) { - raise_core_error_from_connection(*conn_); + raise_core_error_from_connection("cannot fetch binlog event", *conn_); } return std::as_bytes( std::span{casted_rpl_impl->buffer, casted_rpl_impl->size}); diff --git a/src/easymysql/connection.cpp b/src/easymysql/connection.cpp index 73e7c5f..a061669 100644 --- a/src/easymysql/connection.cpp +++ b/src/easymysql/connection.cpp @@ -53,7 +53,8 @@ connection::connection(const connection_config &config) /* port */ config.get<"port">(), /* unix socket */ nullptr, /* flags */ 0) == nullptr) { - raise_core_error_from_connection(*this); + raise_core_error_from_connection("cannot establish MySQL connection", + *this); } } @@ -95,7 +96,7 @@ void connection::execute_generic_query_noresult(std::string_view query) { assert(!is_empty()); auto *casted_impl = connection_deimpl::get(impl_); if (mysql_real_query(casted_impl, std::data(query), std::size(query)) != 0) { - raise_core_error_from_connection(*this); + raise_core_error_from_connection("cannot execute query", *this); } } diff --git a/src/easymysql/core_error_helpers_private.cpp b/src/easymysql/core_error_helpers_private.cpp index 4e2586e..18f5272 100644 --- a/src/easymysql/core_error_helpers_private.cpp +++ b/src/easymysql/core_error_helpers_private.cpp @@ -16,6 +16,8 @@ #include "easymysql/core_error_helpers_private.hpp" #include +#include +#include #include @@ -34,7 +36,8 @@ struct raise_access { }; [[noreturn]] void -raise_core_error_from_connection(const connection &conn, +raise_core_error_from_connection(std::string_view user_message, + const connection &conn, std::source_location location) { // default value std::source_location::current() for the 'location' // parameter is specified in the declaration of this function @@ -45,8 +48,15 @@ raise_core_error_from_connection(const connection &conn, // location from where this function is called auto *casted_impl = connection_deimpl::get_const_casted(raise_access::get(conn)); + std::string message{}; + if (!user_message.empty()) { + message += user_message; + message += ": "; + } + message += mysql_error(casted_impl); + util::exception_location(location).raise( - static_cast(mysql_errno(casted_impl)), mysql_error(casted_impl)); + static_cast(mysql_errno(casted_impl)), message); } } // namespace easymysql diff --git a/src/easymysql/core_error_helpers_private.hpp b/src/easymysql/core_error_helpers_private.hpp index 88a743d..08546f1 100644 --- a/src/easymysql/core_error_helpers_private.hpp +++ b/src/easymysql/core_error_helpers_private.hpp @@ -17,12 +17,14 @@ #define EASYMYSQL_CORE_ERROR_HELPERS_PRIVATE_HPP #include "easymysql/connection_fwd.hpp" + #include +#include namespace easymysql { [[noreturn]] void raise_core_error_from_connection( - const connection &conn, + std::string_view user_message, const connection &conn, std::source_location location = std::source_location::current()); } // namespace easymysql