-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP]feat: use user space buff to write binlog instead of Mmap #2848
base: unstable
Are you sure you want to change the base?
[WIP]feat: use user space buff to write binlog instead of Mmap #2848
Conversation
2 remove auto Flush() for every write
WalkthroughThe recent changes enhance file handling and logging capabilities within the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Binlog
participant BufferedWritableFile
User->>Binlog: Start logging
Binlog->>BufferedWritableFile: Create buffered writable file
BufferedWritableFile-->>Binlog: Return writable file
Binlog->>BufferedWritableFile: Append data
BufferedWritableFile-->>Binlog: Confirm append
Binlog->>BufferedWritableFile: Flush buffered data
BufferedWritableFile-->>Binlog: Confirm flush
Binlog->>User: Logging successful
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (5)
- include/pika_binlog.h (3 hunks)
- include/pika_define.h (1 hunks)
- src/pika_binlog.cc (6 hunks)
- src/pstd/include/env.h (2 hunks)
- src/pstd/src/env.cc (3 hunks)
Files skipped from review due to trivial changes (1)
- include/pika_define.h
Additional comments not posted (16)
include/pika_binlog.h (2)
16-16
: Header inclusion approved.The inclusion of
dispatch_thread.h
is necessary for theTimerTaskThread
class.
80-80
: Additions approved, ensure proper testing.The
FlushBufferedFile
method andtimer_task_thread_
member variable are appropriate additions for the new buffering mechanism. Ensure that the new functionality is thoroughly tested.Also applies to: 113-114
src/pstd/include/env.h (2)
71-73
: Addition ofNewBufferedWritableFile
approved.The
NewBufferedWritableFile
function enhances file handling by allowing buffered writes, which can improve performance.
74-76
: Addition ofBufferedAppendableFile
approved.The
BufferedAppendableFile
function complementsNewBufferedWritableFile
by providing similar buffering capabilities for appending data, enhancing performance and flexibility.src/pika_binlog.cc (7)
84-84
: Use ofNewBufferedWritableFile
approved.The use of
NewBufferedWritableFile
in theBinlog
constructor is appropriate for implementing buffered writes, which can improve performance.
115-115
: Use ofBufferedAppendableFile
approved.The use of
BufferedAppendableFile
in theBinlog
constructor is appropriate for implementing buffered writes for appending data, enhancing performance.
125-127
: Addition of timer task for flushing buffer approved.The addition of a timer task to flush the buffer every 500 milliseconds ensures that buffered data is periodically written to disk, improving data integrity and performance.
132-132
: Stopping of timer task in destructor approved.Stopping the timer task in the
Binlog
destructor ensures proper resource management and prevents potential memory leaks or dangling operations.
136-144
: Addition ofFlushBufferedFile
method approved.The
FlushBufferedFile
method is crucial for the new buffering mechanism, ensuring that buffered data is written to disk. The method acquires a lock before flushing to ensure thread safety.
227-227
: Use ofNewBufferedWritableFile
inPut
method approved.The use of
NewBufferedWritableFile
in thePut
method is appropriate for implementing buffered writes, which can improve performance.
404-404
: Use ofBufferedAppendableFile
inSetProducerStatus
method approved.The use of
BufferedAppendableFile
in theSetProducerStatus
method is appropriate for implementing buffered writes for appending data, enhancing performance.Also applies to: 443-443
src/pstd/src/env.cc (5)
484-492
: Constructor looks good.The constructor initializes the class members correctly and asserts that the file pointer is not null.
543-548
: Flush method looks good.The
Flush
method correctly handles potential errors fromfflush
.
550-560
: Sync method looks good.The
Sync
method correctly handles potential errors fromfsync
.
562-562
: Trim method looks good.The
Trim
method correctly returnsStatus::OK
as a no-op.
564-565
: Filesize method looks good.The
Filesize
method correctly returns the current file size.
Status Close() override { | ||
if (fclose(file_) != 0) { | ||
return IOError("fclose failed: " + filename_, errno); | ||
} | ||
file_ = nullptr; | ||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential errors from fclose
in Close
method.
The Close
method should handle potential errors from fclose
to ensure that any issues during file closing are logged or managed.
- if (fclose(file_) != 0) {
- return IOError("fclose failed: " + filename_, errno);
- }
+ if (fclose(file_) != 0) {
+ auto err = errno;
+ file_ = nullptr;
+ return IOError("fclose failed: " + filename_, err);
}
file_ = nullptr;
return Status::OK();
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Status Close() override { | |
if (fclose(file_) != 0) { | |
return IOError("fclose failed: " + filename_, errno); | |
} | |
file_ = nullptr; | |
return Status::OK(); | |
} | |
Status Close() override { | |
if (fclose(file_) != 0) { | |
auto err = errno; | |
file_ = nullptr; | |
return IOError("fclose failed: " + filename_, err); | |
} | |
file_ = nullptr; | |
return Status::OK(); | |
} |
~BufferedWritableFile() override { | ||
if (file_) { | ||
BufferedWritableFile::Close(); | ||
} | ||
if(buffer_) { | ||
free(buffer_); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential errors from fclose
in the destructor.
The destructor should handle potential errors from fclose
to ensure that any issues during file closing are logged or managed.
- if (file_) {
- BufferedWritableFile::Close();
- }
+ if (file_) {
+ auto s = BufferedWritableFile::Close();
+ if (!s.ok()) {
+ LOG(WARNING) << "Error closing file in destructor: " << s.ToString();
+ }
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
~BufferedWritableFile() override { | |
if (file_) { | |
BufferedWritableFile::Close(); | |
} | |
if(buffer_) { | |
free(buffer_); | |
} | |
} | |
~BufferedWritableFile() override { | |
if (file_) { | |
auto s = BufferedWritableFile::Close(); | |
if (!s.ok()) { | |
LOG(WARNING) << "Error closing file in destructor: " << s.ToString(); | |
} | |
} | |
if(buffer_) { | |
free(buffer_); | |
} | |
} |
Status Append(const Slice& data) override { | ||
if (!file_) { | ||
return IOError("fwrite target: " + filename_ + " is not opened", errno); | ||
} | ||
const char* src = data.data(); | ||
size_t left = data.size(); | ||
int32_t max_retries = 4; | ||
int retry_count = 0; | ||
|
||
while (left > 0) { | ||
size_t written = fwrite(src, sizeof(char), left, file_); | ||
if (written == 0) { | ||
if (ferror(file_)) { | ||
int err_num = errno; | ||
clearerr(file_); | ||
return IOError("fwrite error with " + filename_, err_num); | ||
} | ||
if (errno == ENOSPC || ++retry_count > max_retries) { | ||
return IOError(filename_, errno); | ||
} | ||
} | ||
src += written; | ||
left -= written; | ||
curr_file_size_ += written; | ||
retry_count = 0; | ||
} | ||
|
||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve handling of ENOSPC
and retry logic in Append
method.
The Append
method should handle ENOSPC
more gracefully and ensure that retries are limited to avoid infinite loops.
- if (errno == ENOSPC || ++retry_count > max_retries) {
+ if (errno == ENOSPC) {
+ return Status::Error("No space left on device: " + filename_);
+ }
+ if (++retry_count > max_retries) {
return IOError(filename_, errno);
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Status Append(const Slice& data) override { | |
if (!file_) { | |
return IOError("fwrite target: " + filename_ + " is not opened", errno); | |
} | |
const char* src = data.data(); | |
size_t left = data.size(); | |
int32_t max_retries = 4; | |
int retry_count = 0; | |
while (left > 0) { | |
size_t written = fwrite(src, sizeof(char), left, file_); | |
if (written == 0) { | |
if (ferror(file_)) { | |
int err_num = errno; | |
clearerr(file_); | |
return IOError("fwrite error with " + filename_, err_num); | |
} | |
if (errno == ENOSPC || ++retry_count > max_retries) { | |
return IOError(filename_, errno); | |
} | |
} | |
src += written; | |
left -= written; | |
curr_file_size_ += written; | |
retry_count = 0; | |
} | |
return Status::OK(); | |
} | |
Status Append(const Slice& data) override { | |
if (!file_) { | |
return IOError("fwrite target: " + filename_ + " is not opened", errno); | |
} | |
const char* src = data.data(); | |
size_t left = data.size(); | |
int32_t max_retries = 4; | |
int retry_count = 0; | |
while (left > 0) { | |
size_t written = fwrite(src, sizeof(char), left, file_); | |
if (written == 0) { | |
if (ferror(file_)) { | |
int err_num = errno; | |
clearerr(file_); | |
return IOError("fwrite error with " + filename_, err_num); | |
} | |
if (errno == ENOSPC) { | |
return Status::Error("No space left on device: " + filename_); | |
} | |
if (++retry_count > max_retries) { | |
return IOError(filename_, errno); | |
} | |
} | |
src += written; | |
left -= written; | |
curr_file_size_ += written; | |
retry_count = 0; | |
} | |
return Status::OK(); | |
} |
Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result, | ||
int32_t user_space_buf_size_bytes, int64_t offset) { | ||
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644); | ||
if (fd < 0) { | ||
return IOError(fname, errno); | ||
} | ||
|
||
FILE* file = fdopen(fd, "r+"); | ||
if (file == nullptr) { | ||
close(fd); | ||
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno); | ||
} | ||
|
||
if (user_space_buf_size_bytes < 0) { | ||
fclose(file); | ||
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile"); | ||
} | ||
char* buf = nullptr; | ||
int32_t r = 0; | ||
if (user_space_buf_size_bytes != 0) { | ||
buf = (char*)malloc(user_space_buf_size_bytes); | ||
if (!buf) { | ||
fclose(file); | ||
return Status::Error("Failed to allocate buffer when BufferedAppendableFile"); | ||
} | ||
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes); | ||
} else { | ||
r = setvbuf(file, nullptr, _IONBF, 0); | ||
} | ||
if (r != 0) { | ||
fclose(file); | ||
return IOError("Failed to set user space buffer for " + fname, errno); | ||
} | ||
|
||
// Move the file pointer to the specified offset | ||
if (fseek(file, offset, SEEK_SET) != 0) { | ||
fclose(file); | ||
return IOError("Failed to seek to the specified offset in " + fname, errno); | ||
} | ||
|
||
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf); | ||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling in BufferedAppendableFile
function.
The function should handle errors from buffer allocation and fseek
more gracefully.
- if (user_space_buf_size_bytes != 0) {
- buf = (char*)malloc(user_space_buf_size_bytes);
- if (!buf) {
- fclose(file);
- return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
- }
- r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
- } else {
- r = setvbuf(file, nullptr, _IONBF, 0);
- }
- if (r != 0) {
- fclose(file);
- return IOError("Failed to set user space buffer for " + fname, errno);
- }
+ if (user_space_buf_size_bytes != 0) {
+ buf = (char*)malloc(user_space_buf_size_bytes);
+ if (!buf) {
+ fclose(file);
+ return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
+ }
+ r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
+ } else {
+ r = setvbuf(file, nullptr, _IONBF, 0);
+ }
+ if (r != 0) {
+ free(buf);
+ fclose(file);
+ return IOError("Failed to set user space buffer for " + fname, errno);
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result, | |
int32_t user_space_buf_size_bytes, int64_t offset) { | |
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644); | |
if (fd < 0) { | |
return IOError(fname, errno); | |
} | |
FILE* file = fdopen(fd, "r+"); | |
if (file == nullptr) { | |
close(fd); | |
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno); | |
} | |
if (user_space_buf_size_bytes < 0) { | |
fclose(file); | |
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile"); | |
} | |
char* buf = nullptr; | |
int32_t r = 0; | |
if (user_space_buf_size_bytes != 0) { | |
buf = (char*)malloc(user_space_buf_size_bytes); | |
if (!buf) { | |
fclose(file); | |
return Status::Error("Failed to allocate buffer when BufferedAppendableFile"); | |
} | |
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes); | |
} else { | |
r = setvbuf(file, nullptr, _IONBF, 0); | |
} | |
if (r != 0) { | |
fclose(file); | |
return IOError("Failed to set user space buffer for " + fname, errno); | |
} | |
// Move the file pointer to the specified offset | |
if (fseek(file, offset, SEEK_SET) != 0) { | |
fclose(file); | |
return IOError("Failed to seek to the specified offset in " + fname, errno); | |
} | |
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf); | |
return Status::OK(); | |
} | |
Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result, | |
int32_t user_space_buf_size_bytes, int64_t offset) { | |
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644); | |
if (fd < 0) { | |
return IOError(fname, errno); | |
} | |
FILE* file = fdopen(fd, "r+"); | |
if (file == nullptr) { | |
close(fd); | |
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno); | |
} | |
if (user_space_buf_size_bytes < 0) { | |
fclose(file); | |
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile"); | |
} | |
char* buf = nullptr; | |
int32_t r = 0; | |
if (user_space_buf_size_bytes != 0) { | |
buf = (char*)malloc(user_space_buf_size_bytes); | |
if (!buf) { | |
fclose(file); | |
return Status::Error("Failed to allocate buffer when BufferedAppendableFile"); | |
} | |
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes); | |
} else { | |
r = setvbuf(file, nullptr, _IONBF, 0); | |
} | |
if (r != 0) { | |
free(buf); | |
fclose(file); | |
return IOError("Failed to set user space buffer for " + fname, errno); | |
} | |
// Move the file pointer to the specified offset | |
if (fseek(file, offset, SEEK_SET) != 0) { | |
fclose(file); | |
return IOError("Failed to seek to the specified offset in " + fname, errno); | |
} | |
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf); | |
return Status::OK(); | |
} |
Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result, | ||
int32_t user_space_buf_size_bytes) { | ||
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644); | ||
if (fd < 0) { | ||
return IOError(fname, errno); | ||
} | ||
|
||
FILE* file = fdopen(fd, "w+"); | ||
if (file == nullptr) { | ||
close(fd); | ||
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno); | ||
} | ||
|
||
if (user_space_buf_size_bytes < 0) { | ||
fclose(file); | ||
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile"); | ||
} | ||
char* buf = nullptr; | ||
int32_t r = 0; | ||
if (user_space_buf_size_bytes != 0) { | ||
buf = (char*)malloc(user_space_buf_size_bytes); | ||
if (!buf) { | ||
fclose(file); | ||
return Status::Error("Failed to allocate buffer when BufferedAppendableFile"); | ||
} | ||
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes); | ||
} else { | ||
r = setvbuf(file, nullptr, _IONBF, 0); | ||
} | ||
if (r != 0) { | ||
fclose(file); | ||
return IOError("Failed to set user space buffer for " + fname, errno); | ||
} | ||
|
||
// the file was trancated if it was existing for syscall open use flag "O_TRUNC", | ||
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf); | ||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling in NewBufferedWritableFile
function.
The function should handle errors from buffer allocation more gracefully.
- if (user_space_buf_size_bytes != 0) {
- buf = (char*)malloc(user_space_buf_size_bytes);
- if (!buf) {
- fclose(file);
- return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
- }
- r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
- } else {
- r = setvbuf(file, nullptr, _IONBF, 0);
- }
- if (r != 0) {
- fclose(file);
- return IOError("Failed to set user space buffer for " + fname, errno);
- }
+ if (user_space_buf_size_bytes != 0) {
+ buf = (char*)malloc(user_space_buf_size_bytes);
+ if (!buf) {
+ fclose(file);
+ return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
+ }
+ r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
+ } else {
+ r = setvbuf(file, nullptr, _IONBF, 0);
+ }
+ if (r != 0) {
+ free(buf);
+ fclose(file);
+ return IOError("Failed to set user space buffer for " + fname, errno);
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result, | |
int32_t user_space_buf_size_bytes) { | |
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644); | |
if (fd < 0) { | |
return IOError(fname, errno); | |
} | |
FILE* file = fdopen(fd, "w+"); | |
if (file == nullptr) { | |
close(fd); | |
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno); | |
} | |
if (user_space_buf_size_bytes < 0) { | |
fclose(file); | |
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile"); | |
} | |
char* buf = nullptr; | |
int32_t r = 0; | |
if (user_space_buf_size_bytes != 0) { | |
buf = (char*)malloc(user_space_buf_size_bytes); | |
if (!buf) { | |
fclose(file); | |
return Status::Error("Failed to allocate buffer when BufferedAppendableFile"); | |
} | |
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes); | |
} else { | |
r = setvbuf(file, nullptr, _IONBF, 0); | |
} | |
if (r != 0) { | |
fclose(file); | |
return IOError("Failed to set user space buffer for " + fname, errno); | |
} | |
// the file was trancated if it was existing for syscall open use flag "O_TRUNC", | |
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf); | |
return Status::OK(); | |
} | |
Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result, | |
int32_t user_space_buf_size_bytes) { | |
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644); | |
if (fd < 0) { | |
return IOError(fname, errno); | |
} | |
FILE* file = fdopen(fd, "w+"); | |
if (file == nullptr) { | |
close(fd); | |
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno); | |
} | |
if (user_space_buf_size_bytes < 0) { | |
fclose(file); | |
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile"); | |
} | |
char* buf = nullptr; | |
int32_t r = 0; | |
if (user_space_buf_size_bytes != 0) { | |
buf = (char*)malloc(user_space_buf_size_bytes); | |
if (!buf) { | |
fclose(file); | |
return Status::Error("Failed to allocate buffer when BufferedAppendableFile"); | |
} | |
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes); | |
} else { | |
r = setvbuf(file, nullptr, _IONBF, 0); | |
} | |
if (r != 0) { | |
free(buf); | |
fclose(file); | |
return IOError("Failed to set user space buffer for " + fname, errno); | |
} | |
// the file was trancated if it was existing for syscall open use flag "O_TRUNC", | |
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf); | |
return Status::OK(); | |
} |
更新:之前使用fwrite的实现的攒批并不完善,重新设计了一个自管理缓存的方案如下,已经排期,预计在其他优先级更高的工作完成之后会开始实施,最终会以本PR提交代码:
Pika Binlog 攒批写方案:
简述:原本的Binlog用Mmap来写入,在写binlog的场景下会频繁触发页中断(Perf中也能看到PageFault的相应开销不小)。本方案预计使用一个自维护的用户态缓冲区来积攒Binlog,攒批写入,flush时机有两个,一时缓冲区写满了,二是每隔500ms检查一次,如果距离上次刷盘已经过去了900ms,就手动flush。另外在实现有锁版本之后,会继续对该缓冲区进行无锁优化。
前置测试(NVME盘, Bechmark 10线程, Pika QPS达到上限):
关闭Binlog的情况下,Pika的最大QPS涨幅有30-40%
使用fwrite(512KB用户态缓存)简陋地实现了一版Binlog攒批写入,在Pika基于多实例的情况下,平均QPS涨幅能达到18%
Perf可见do_page_fault调用占Binlog Put开销的大概一半,剩下一半则是锁开销(没错,Binlog也是个大锁)
上述测试说明,Binlog攒批写改造,是有收益的。
阶段1:缓冲区实现细则:
维护一个可配置大小的、自维护(不使用fwrite,因为flush时需要运行一些逻辑,而fwrite没有提供回调接口)的缓冲区。每次flush都执行install offset操作,即更新Master内存中的最新binlog offst点位(只要同步进度低于该点位,Master就会不断将Binlog推送给slave,所以就在flush 缓冲区后来更新该点位 ),并且将其持久化。
阶段2:对该缓冲区做无锁优化
在有锁版本完成之后,对缓冲区做无锁优化,一个直接的想法是维护一个原子变量来标志当前缓冲区的写入位置,每次有线程到达,都根据自己的BInlog Body大小对该原子量做加法(往后推移这个写入位置,相当于声明获取了一片内存),同时自己对做加法的范围进行写入,这样前一个线程还在写入时,后一个线程也可以同时对原子变量做加法来获取自己的缓冲区写入区域。另外还能想到的一个点是:这样并发写缓冲区可能很快就能写满,那么缓冲区本身的flush可能会成为瓶颈,于是可以考虑学RocksDB的mm和imm的概念,每次缓冲区写满直接switch一个新的出来继续提供写入,而写满的这份的就类似于imm,进行异步刷盘。
关键问题A:
一旦进程崩溃,存储在主节点上用户态缓冲区的Binlog数据都会丢失,后面从节点重连,会直接基于Binlog进行增量同步,但是在用户态缓冲区中的那部分Binlog实际没有在主节点上落盘,从节点消费不到这一部分Binlog,会导致主从不一致。((MySQL也有这个问嗯,其使用二阶段提交来进行解决,但是Pika对RocksDB需要保持无侵入,无法修改WAL,所以没法学这个二阶段提交)。针对这个问题的解决办法是利用持久化的flag机制结合binlog offset进行判断,在出现Binlog丢失时强制从节点采用全量同步来建连。
对于关键问题A的解决办法:
每次缓冲区从空白转为非空时,持久化一个flag:binlog_buff_empty为false, 每次缓冲区刷盘后,将改flag改回true,该flag和持久化的binlog offst放在同一个文件即可(binlog目录下的manifest,使用mmap操作)。在每次启动Pika,先读取这个manifest中的binlog_buff_empty, 备份在内存中。如果有从节点试图增量建连,在常规逻辑之前加一个判断:binlog_buff_empty是否为false,如果是false,则说明主节点之前崩溃过,有Binlog丢失,则强制从节点全量建连。
此时会引出一个问题B:Pika运行期间,这个备份的flag始终是false的话,意味着所有从节点的建联都改为了强制全量,但如果有一些从节点全量建联一段时间之后因为网络波动断开,重新建联,此时也会强制全量同步,但实际上是允许这样的从节点进行增量同步的。
针对这个问题B的解法1是:每次Pika启动时将从manifest中读取的binlog offset也读到内存中进行备份(记其为offset B)。当有从节点试图建立增量连接,先判断binlog_buff_empty是否为false,如是,再判断从节点携带的binlog offset同offset B进行对比,如果用户提供的binlog offset小于或等于offset B,说明该从节点从binlog消费逻辑来看,是应当要去消费丢失了的那部分binlog,此时强制其进行全量同步;另一个分支是,如果从节点的binlog offset大于offset B,则说明其之前已经通过全量同步建立过连接,属于断开以后再重连,则允许其建立增量同步。
当然,问题B还有一个解法2:当Pika启动时发现之前丢过binlog的话,直接抹掉自己的ReplicationID,这样后面的从节点连接上来就会报错,需要slaveof force执行强制建联(人工处理)。但是否要这样做还得搞清楚DBA们的操作逻辑,可以听取DBA的建议。
---------------------------------分割线------------------------------------------
本PR主要做了:binlog攒批写入改造
关联的另外一个问题:有用户反映在性能差的IO设备上,写Binlog会带来时延毛刺。
原代码使用Mmap一次映射1M空间来追加写入BInlog,Mmap虽然可以避免频繁系统调用,但实际上在追加写Binlog这种场景下,会不断触发页中断(这1M映射空间实际上是Copy On Write, 就算可能会做一点预分配,但也抵消不了binlog是一条一条写的劣势)具体地:Mmap映射到Page Cache,而写BInlog用到的每一个页都是新页(不在page cache中),实质上每次抵达页边界时memcpy都会触发页中断(目标页还未分配),并且大概率page Cache需要执行evict来为这个新页腾出空间, 而这又可能伴随着刷脏,而binlog是一条一条写入的又意味着其实前后两次刷脏之间还是有一会儿间隔(隔一会才能写满一个page), 那就意味着大概率没法将前后两次刷脏的page做IO合并。
最后造成的局面是:memcpy写了若干笔binlog,凑满了一个page,然后触发页中断,page cache刷脏一个页,然后过一会又重复一次。对块设备来说:每过一会就来一个小IO,小IO之间还有一定时间间隔,没法合并成大IO。哪怕撇开页中断本身的开销,小IO也发挥不出固态内部的并行性。在这种场景下,改成攒批写入的好处是:1 没有频繁页中断/切内核态 2 虽然也会经过page cache,也触发evict,但一次性会刷脏一批page,能合并成大IO。
本PR将Binlog攒批写入,根据测试,本PR在单RocksDB实例下,平均可以提高10%写QPS,多RocksDB实例(3实例)在大写入量下平均可以提高15-18%写QPS。
目前:使用带有512KB,用户态缓存的fwrite来写入binlog, 增加了定时任务,每500ms主动flush一次。
TODO:
1 将Pika的定时任务统一一下,将定时器线程挂靠到PikaSever, 归集分散的定时任务都到这个线程上去(如blpop过期扫描,binlog定时flushd)
2 进一步测试性能以及可靠性,将各种edge考虑进来,对收益和代价都要列清楚。
单独考虑Master进程Crash的场景(似乎还没有听到过这种情况,几率应该极其小),可以做一个如下逻辑:
每次flush完毕,持久化一个标志位binlog_flushed=true, 等下次再有写请求进来,往buf写数据,持久化这个binlog_flushed=false。也就是:如果有buf中的数据丢了,持久化的binlog_flushed会为false, 这种情况下Pika在Recovery时将自己的ReplicationID去掉,slave便无法增量建联,只能删除自己旧的replicationID再重新全量建联,这样就不会用到在buf中丢了一截数据的Binlog,通过全量能达到主从一致。
当然,从长期规划来说,把binlog改幂等,以后直接用binlog崩溃恢复,那这方面的问题就迎刃而解,但这个工作确实也不是那么好干的。
Summary by CodeRabbit
New Features
BufferedWritableFile
class provides improved error handling and robust file operations.Bug Fixes
Binlog
class.Documentation