From ca09937c142bbec52f7c002f1afaef3a657f851f Mon Sep 17 00:00:00 2001 From: cfanbo Date: Sat, 17 Aug 2024 00:33:58 +0800 Subject: [PATCH] init --- .github/workflows/rust.yaml | 61 ++ .gitignore | 13 + Cargo.toml | 21 + LICENSE | 201 +++++++ README.md | 112 ++++ config.toml | 11 + install.sh | 101 ++++ src/cli.rs | 1 + src/cli/cli.rs | 39 ++ src/config.rs | 306 ++++++++++ src/entry.rs | 2 + src/entry/entry.rs | 433 ++++++++++++++ src/entry/hint.rs | 179 ++++++ src/error.rs | 1 + src/error/error.rs | 15 + src/lib.rs | 10 + src/main.rs | 21 + src/server.rs | 873 +++++++++++++++++++++++++++ src/store.rs | 2 + src/store/file.rs | 73 +++ src/store/store.rs | 1128 +++++++++++++++++++++++++++++++++++ src/util.rs | 94 +++ src/util/lock.rs | 58 ++ src/util/time.rs | 30 + 24 files changed, 3785 insertions(+) create mode 100644 .github/workflows/rust.yaml create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 config.toml create mode 100755 install.sh create mode 100644 src/cli.rs create mode 100644 src/cli/cli.rs create mode 100644 src/config.rs create mode 100644 src/entry.rs create mode 100644 src/entry/entry.rs create mode 100644 src/entry/hint.rs create mode 100644 src/error.rs create mode 100644 src/error/error.rs create mode 100644 src/lib.rs create mode 100644 src/main.rs create mode 100644 src/server.rs create mode 100644 src/store.rs create mode 100644 src/store/file.rs create mode 100644 src/store/store.rs create mode 100644 src/util.rs create mode 100644 src/util/lock.rs create mode 100644 src/util/time.rs diff --git a/.github/workflows/rust.yaml b/.github/workflows/rust.yaml new file mode 100644 index 0000000..5e8da92 --- /dev/null +++ b/.github/workflows/rust.yaml @@ -0,0 +1,61 @@ +name: Rust + +permissions: + contents: write + +on: + push: + tags: + - v[0-9]+.* + pull_request: + tags: + - v[0-9]+.* +env: + CARGO_TERM_COLOR: always + +jobs: + create-release: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: taiki-e/create-gh-release-action@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + + upload-assets: + needs: create-release + strategy: + matrix: + include: + - target: x86_64-unknown-linux-gnu + os: ubuntu-latest + - target: aarch64-unknown-linux-gnu + os: ubuntu-latest + - target: x86_64-apple-darwin + os: macos-latest + - target: aarch64-apple-darwin + os: macos-latest + - target: x86_64-pc-windows-msvc + os: windows-latest + - target: aarch64-pc-windows-msvc + os: windows-latest + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + - name: Build + run: cargo build --verbose + - name: Run tests + run: cargo test --verbose + - name: Install cross-compilation tools + uses: taiki-e/setup-cross-toolchain-action@v1 + with: + target: ${{ matrix.target }} + if: startsWith(matrix.os, 'ubuntu') + - uses: taiki-e/upload-rust-binary-action@v1 + with: + bin: minkv + target: ${{ matrix.target }} + tar: unix + zip: windows + archive: $bin-$tag-$target + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f728d79 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +/target +*.txt +*.bin +**/*.rs.bk +*.pdb +Cargo.lock +debug/ +target/ +settings.json +/.zed +dbdata +.DS_Store +lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f363862 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "minkv" +version = "0.0.1" +edition = "2021" + +[dependencies] +chrono = "0.4.38" +crc32fast = "1.4.2" +env_logger = "0.11.5" +log = "0.4.22" +serde = { version = "1.0.204", features = ["derive"] } +# dashmap = "5.0" +fs4 = { version = "0.9", features = ["sync"] } +rand = "0.8.5" +regex = "1.10.6" +toml = "0.8.19" +anyhow = "1.0.86" +tempfile = "3.12.0" +clap = { version = "4.5.10", features = ["derive"] } +redis-protocol = { version = "5.0.1", features = ["std", "codec"] } +fs2 = "0.4.3" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f6c77cb --- /dev/null +++ b/README.md @@ -0,0 +1,112 @@ +# MinKV + +`minkv` 是一款基于 [Bitcask](https://riak.com/assets/bitcask-intro.pdf) 存储模型构建的轻量级、快速、可靠的 KV 存储引擎,同时支持持久化存储。 + +其设计思想基于 `LSM (Log-Structured Merge-tree) `数据结构和算法,此算法目前已经非常的成熟,它在许多现代数据库系统和存储引擎中被广泛使用,例如 [LevelDB](https://github.com/google/leveldb)、[RocksDB](https://rocksdb.org/)、[Cassandra](https://github.com/apache/cassandra) 等。 + +# 安装 + +## 手动下载安装 + +下载并解压安装包,将 `minkv` 可执行文件移动到 `/usr/local/bin/` 目录或将其添加到 `$PATH` 环境变量。 + +下载地址: + +- [MacOS x86_64](https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-latest-x86_64-apple-darwin.tar.gz) +- [MacOS aarch64](https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-latest-aarch64-apple-darwin.tar.gz) +- [Linux x86_64](https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-latest-x86_64-unknown-linux-gnu.tar.gz) +- [Linux aarch64](https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-latest-aarch64-unknown-linux-gnu.tar.gz) +- [Windows x86_64](https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-latest-x86_64-pc-windows-msvc.zip) +- [Windows aarch64](https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-latest-aarch64-pc-windows-msvc.zip) + +所有发布版本请[点击这里](https://github.com/cfanbo/minkv/releases) + +## shell 脚本安装 + +您可以在macOS终端或Linux shell提示符中粘贴以下命令并执行,注意权限问题。 + +```shell +/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/cfanbo/minkv/HEAD/install.sh)" +``` + +如果您使用的是 `zsh`的话,命令行前面更改为 `/bin/zsh`即可。 + +# 配置 + +创建配置文件 +```toml +db_dir = "/server/dbdata" +data = "dbdata" +file_max_size = 10240000 + +# 当更新key达到指定数量时刷新 +sync_keys = 1 + +[server] +address = "127.0.0.1" +port = 6381 +``` + +字段意义 + +- `db_dir` 存放数据库文件目录路径 + +- `data` 表示数据文件名,至少会存在一个 `data` 文件,如果文件进行了分隔,则可能产生 `data.N`文件 + +- `file_max__size` 表示文件大小达到这个值的时候,将自动进行文件分隔,生成新的数据文件,文件名为 `data.N` + +- `sync_keys` 表示写入内容时达到多少次写或删除操作,会刷新缓存到磁盘。 + + 如果指定为`0`,则表示启用操作系统的缓存刷新磁盘机制 + 如果指定为`1` ,则表示每次更新文件后自动调用 `flush()` 函数,对内容进行持久化 + +- `server.address` 表示服务监听 IP 地址 + +- `server.port` 表示服务监听端口号 + +对于 `sync_keys` 的设置一定要根据业务访问量情况设置,如果设置为 `1`,会频繁的进行文件内容同步,可能性能会有一些影响。如果设置的值过大,可能存在意外断电导致部分内容未持久化磁盘,如果此值过大,超出了系统默认的同步周期,系统也会自动同步缓存至磁盘的。 + + +# 启动服务 + +```shell +$ minkv serve -c config.toml +Listening on 127.0.0.1:6381 +``` + +# 使用 + +客户端与服务端通讯基于 redis 协议开发,因此可以直接使用 redis 客户端进行访问,只需要指定对应的 `ip:port` 即可。 + +目前支持的指令 + +- get +- set +- del +- mset +- mget +- getset +- incr +- decr +- incrby +- decrby +- append +- exists +- expire +- pexpire +- expireat +- pexpireat +- ttl +- pttl +- persist +- keys + +以上用户完全与 redis 用法一样。 + +# 备份与恢复 + +对于备份只需要简单的复制数据库目录 `db_dir` 里的所有文件即可,同样恢复也是将所有备份文件放在这个目录即可。数据库文件主要是`data` 、`data.N` 数据文件和 `hint.N` 索引文件组成,文件名中n` 表示文件编号,索引文件编号与数据文件编写是一一对应的。 + +# 其它 + +后续根据使用场景,可能会支持更多的指令。 diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..fc4a1de --- /dev/null +++ b/config.toml @@ -0,0 +1,11 @@ +file_max_size = 10240000 +db_dir = "./dbdata" +file = "mysql" +merge_file_num = 2 + +# 当更新一定数量时刷新 +sync_keys = 5 + +[server] +address = '127.0.0.1' +port = 6381 diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..0172b1d --- /dev/null +++ b/install.sh @@ -0,0 +1,101 @@ +#!/usr/bin/env bash + +#!/usr/bin/env bash + +set -e +o pipefail + +show_help() { +cat << EOF + + MinKV Simple KV Database + + -h Display this help and exit + + -V VERSION Custom Software version. Default is 'latest' + +EOF +} + +abort() { + printf "%s\n" "$@" >&2 + exit 1 +} + +# First check OS. +OS="$(uname)" +if [[ "${OS}" == "Linux" ]] +then + CLI_ON_LINUX=1 +elif [[ "${OS}" == "Darwin" ]] +then + CLI_ON_MACOS=1 +else + abort "Currently is only supported on macOS and Linux." +fi + +VERSION="latest" + +if [ $# != 0 ]; +then + while getopts "hV:-" o + do + case "$o" in + "h") + show_help + exit 0; + ;; + "V") + VERSION=v"$OPTARG" + ;; + *) + echo -e "Unexpected flag not supported" + exit 1 + ;; + esac + done +fi + + +echo -e " +............888888888888888= ......................... =888888888888888D............. +...........O88888888888888 .............................. D88888888888888............ +..........D888888888888ZI: ............................... Z88D8888888888D........... +.........+88888888 ......................................... 8888888888888D.......... +.........+88888888 .......... Simple KV Database ........... O88888888888D.......... +.........+88888888 .... .... O88888888888D.......... +.........+88888888 ..... https://github.com/cfanbo/minkv .... O88888888888D.......... +.........+88888888D.......................................... +88888888888D.......... +..........D888888888888O+ ................................ ?D888888888888O........... +...........O88888888888888 .............................. O88888888888888............ +...........:D888888888888888 .......................... D888888888888888............. +" + +echo $VERSION; + +if [[ -n "${CLI_ON_MACOS-}" ]] +then + UNAME_MACHINE="$(/usr/bin/uname -m)" + if [[ "${UNAME_MACHINE}" == "arm64" || "${UNAME_MACHINE}" == "aarch64" ]] + then + curl -O -fsSL https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-"$VERSION"-aarch64-apple-darwin.tar.gz + tar zxf minkv-"$VERSION"-aarch64-apple-darwin.tar.gz + else + curl -O -fsSL https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-"$VERSION"-x86_64-apple-darwin.tar.gz + tar zxf minkv-"$VERSION"-x86_64-apple-darwin.tar.gz + fi + mv ./minkv /usr/local/bin/ +fi + +if [[ -n "${CLI_ON_LINUX-}" ]] +then + UNAME_MACHINE="$(/usr/bin/uname -m)" + if [[ "${UNAME_MACHINE}" == "arm64" || "${UNAME_MACHINE}" == "aarch64" ]] + then + curl -O -fsSL https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-"$VERSION"-aarch64-unknown-linux-gnu.tar.gz + tar zxf minkv-"$VERSION"-aarch64-unknown-linux-gnu.tar.gz + else + curl -O -fsSL https://githubfiles.oss-cn-shanghai.aliyuncs.com/minkv/minkv-"$VERSION"-x86_64-unknown-linux-gnu.tar.gz + tar zxf minkv-"$VERSION"-x86_64-unknown-linux-gnu.tar.gz + fi + mv ./minkv /usr/local/bin/ +fi diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 0000000..4f77372 --- /dev/null +++ b/src/cli.rs @@ -0,0 +1 @@ +pub mod cli; diff --git a/src/cli/cli.rs b/src/cli/cli.rs new file mode 100644 index 0000000..cf1f455 --- /dev/null +++ b/src/cli/cli.rs @@ -0,0 +1,39 @@ +use super::super::server; +use clap::arg; +use clap::{Parser, Subcommand}; +use std::path::PathBuf; + +#[derive(Parser)] +#[command(arg_required_else_help = true)] +#[command(version, about, long_about = None)] +struct Cli { + /// Sets a custom config file + #[arg(short, long, value_name = "FILE")] + config: Option, + + #[command(subcommand)] + command: Option, +} + +#[derive(Subcommand)] +enum Commands { + /// startup KV database server + Serve { + /// Sets a custom config file + #[arg(short, long, value_name = "FILE")] + config: Option, + }, +} + +pub fn parse() -> anyhow::Result<()> { + env_logger::init(); + + let cli = Cli::parse(); + + match &cli.command { + Some(Commands::Serve { config }) => { + server::start_server(config) + } + None => Ok(()) + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..fbfa49e --- /dev/null +++ b/src/config.rs @@ -0,0 +1,306 @@ +use regex::Regex; +use serde::Deserialize; +use std::io::{Error, ErrorKind}; +use std::net::IpAddr; +use std::{ + fs, + path::{Path, PathBuf}, +}; + +#[derive(Debug, Deserialize)] +struct FileConfig { + db_dir: Option, + file: Option, + file_max_size: Option, + sync_keys: Option, + merge_file_num: Option, + server: Option, +} + +#[derive(Debug, Deserialize)] +struct FileConfigServer { + address: Option, + port: Option, +} + +impl TryFrom<&Path> for FileConfig { + type Error = anyhow::Error; + + fn try_from(path: &Path) -> Result { + if !path.exists() { + return Err(Error::new(ErrorKind::NotFound, "config file not exists!").into()); + } + + let body = std::fs::read_to_string(path)?; + let config: FileConfig = toml::from_str(&body)?; + + if let Some(server) = &config.server { + if let Some(address) = &server.address { + if !address.parse::().is_ok() { + return Err(Error::new( + ErrorKind::AddrNotAvailable, + "server address invalid".to_string(), + ) + .into()); + } + } + + if let Some(port) = server.port { + if port < 1 || port > 65535 { + return Err(Error::new( + ErrorKind::InvalidData, + format!("invalid port {}", port).to_string(), + ) + .into()); + } + } + } + + Ok(config) + } +} + +#[derive(Debug, Deserialize, Clone)] +pub struct Config { + db_dir: String, + file: String, + file_max_size: usize, // 字节 + sync_keys: u32, + server: ConfigServer, + merge_file_num: usize, +} +#[derive(Debug, Deserialize, Clone)] +struct ConfigServer { + address: String, + port: u32, +} + +impl Default for Config { + fn default() -> Self { + Config { + db_dir: "./dbdata".to_string(), + file: String::from("data"), + file_max_size: 1024 * 100, + sync_keys: 0, + server: ConfigServer { + address: "127.0.0.1".to_string(), + port: 6380, + }, + merge_file_num: 10, + } + } +} + +impl TryFrom<&Path> for Config { + type Error = anyhow::Error; + fn try_from(path: &Path) -> Result { + let config: FileConfig = FileConfig::try_from(path)?; + + // fill in with default values + let mut default_config = Config::default(); + if let Some(value) = config.db_dir { + default_config.db_dir = value; + } + + if let Some(value) = config.file { + default_config.file = value; + } + + if let Some(value) = config.file_max_size { + default_config.file_max_size = value as usize; + } + if let Some(value) = config.merge_file_num { + default_config.merge_file_num = value as usize; + } + + if let Some(value) = config.sync_keys { + default_config.sync_keys = value + } + + if let Some(server) = config.server { + if let Some(server_address) = server.address { + default_config.server.address = server_address + } + if let Some(server_port) = server.port { + default_config.server.port = server_port + } + } + + default_config.check()?; + + Ok(default_config) + } +} + +const MERGE_DIR: &str = ".merge"; +const HINT: &str = "hint"; + +impl Config { + pub fn new() -> anyhow::Result { + let config = Config::default(); + config.check()?; + Ok(config) + } + + fn check(&self) -> anyhow::Result<()> { + // check if the datadir exists + { + let path = self.data_dir(); + if !path.exists() { + fs::create_dir_all(path).expect(&format!("Failed to create directory: {:?}", path)); + } + } + + // check if the merge dir exists,测试是否可写 + { + let path = self.merge_dir(); + if !path.exists() { + fs::create_dir_all(&path) + .expect(&format!("Failed to create directory: {:?}", path)); + } + fs::remove_dir_all(path).unwrap(); + } + + Ok(()) + } + + pub fn get_addr_port(&self) -> (String, u32) { + (self.server.address.clone(), self.server.port) + } + + pub fn merge_cleanup(&self) { + let dir = self.merge_dir(); + let path = Path::new(&dir); + fs::remove_dir(path).unwrap(); + } + + pub fn file(&self) -> &str { + &self.file + } + + pub fn data_dir(&self) -> &Path { + Path::new(&self.db_dir) + } + + pub fn merge_dir(&self) -> PathBuf { + self.data_dir().join(MERGE_DIR) + } + + pub fn get_merge_filepath_by_seq(&self, idx: u16) -> PathBuf { + self.merge_dir().join(idx.to_string()) + } + + pub fn get_merge_hint_filepath_by_seq(&self, idx: u16) -> PathBuf { + let mut path = self.merge_dir().join(idx.to_string()); + path.set_extension(HINT); + path + } + + pub fn get_hint_filepath_by_seq(&self, idx: u16) -> PathBuf { + let filename = format!("{}.{}", HINT, idx); + let file = self.data_dir().join(filename); + file + } + + pub fn file_max_size(&self) -> usize { + self.file_max_size + } + + pub fn get_filepath_by_seq(&self, idx: u16) -> PathBuf { + let filename = format!("{}.{}", self.file, idx); + let file = self.data_dir().join(filename); + file + } + + pub fn get_active_filepath(&self) -> PathBuf { + self.data_dir().join(self.file()) + } + + pub fn get_next_datafile_seq(&self) -> u16 { + let dir_path = self.data_dir(); + let pattern = format!(r"{}\.(\d+)", self.file); // 动态生成正则表达式 + + // 创建正则表达式 + let re = Regex::new(&pattern).unwrap(); + + // 读取目录中的所有文件 + let mut max_number = 0; + for entry in fs::read_dir(dir_path).unwrap() { + let entry = entry.unwrap(); + let file_name = entry.file_name(); + let file_name_str = file_name.to_string_lossy(); + + // 尝试匹配文件名 + if let Some(caps) = re.captures(&file_name_str) { + if let Some(number_str) = caps.get(1) { + if let Ok(number) = number_str.as_str().parse::() { + // 更新最大编号 + if number > max_number { + max_number = number; + } + } + } + } + } + + // seq increment + max_number as u16 + 1 + } + + pub fn get_next_datafile(&self) -> PathBuf { + let seq = self.get_next_datafile_seq(); + self.get_filepath_by_seq(seq) + } + + pub fn get_merge_file_num(&self) -> usize { + self.merge_file_num + } + + pub fn get_sync_keys_num(&self) -> u32 { + self.sync_keys + } +} + +#[cfg(test)] +mod tests { + use std::{io::Write, path::PathBuf}; + use tempfile::NamedTempFile; + + // #[test] + // fn new() { + // let cnf = super::Config::new(); + // let dir = cnf.merge_dir(); + // let tpath = "./dbdata/.merge"; + // assert_eq!(PathBuf::from(tpath), dir); + // } + + // #[test] + // fn get_filepath_by_seq() { + // let idx = 2; + + // let cnf = super::Config::new(); + // let fpath = cnf.get_filepath_by_seq(idx); + // println!("{:?}", fpath); + // assert_eq!(PathBuf::from("./dbdata/data.2"), fpath); + // } + + #[test] + fn config_new_from_file() -> anyhow::Result<()> { + let mut tmpfile = NamedTempFile::new()?; + let config_content = r#" + db_dir = "mysql_data" + [server] + port = 7788 + "#; + writeln!(tmpfile, "{}", config_content)?; + + let fpath = PathBuf::from(tmpfile.path().to_str().unwrap()); + let config = super::Config::try_from(fpath.as_path())?; + assert_eq!(config.db_dir, "mysql_data"); + assert_eq!(config.file, "data"); + assert_eq!(config.file_max_size, 1024 * 100); + assert_eq!(config.server.address, String::from("127.0.0.1")); + assert_eq!(config.server.port, 7788); + Ok(()) + } +} diff --git a/src/entry.rs b/src/entry.rs new file mode 100644 index 0000000..f8f4333 --- /dev/null +++ b/src/entry.rs @@ -0,0 +1,2 @@ +pub mod entry; +pub mod hint; diff --git a/src/entry/entry.rs b/src/entry/entry.rs new file mode 100644 index 0000000..c018ad9 --- /dev/null +++ b/src/entry/entry.rs @@ -0,0 +1,433 @@ +use crate::util; +use crc32fast::Hasher; +use log::*; +use std::fmt; +use std::fs::File; +use std::io::{self, Read, Seek, SeekFrom}; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Op { + Add = 0, + Put = 1, // unused! + Del = 2, +} +impl fmt::Display for Op { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let op_str = match self { + Op::Add => "0", + Op::Put => "1", + Op::Del => "2", + }; + write!(f, "{}", op_str) + } +} +impl Op { + fn from_u8(value: u8) -> Option { + match value { + 0 => Some(Op::Add), + 1 => Some(Op::Put), + 2 => Some(Op::Del), + _ => None, + } + } + fn to_u8(&self) -> u8 { + *self as u8 + } + fn to_le_bytes(&self) -> [u8; 1] { + self.to_u8().to_le_bytes() + } + fn from_le_bytes(bytes: &[u8]) -> Result { + if bytes.len() != 1 { + return Err("Invalid byte array length for Op"); + } + let value = u8::from_le_bytes( + bytes + .try_into() + .map_err(|_| "Invalid byte array conversion")?, + ); + Op::from_u8(value).ok_or("Invalid value for Op") + } +} +impl Default for Op { + fn default() -> Self { + Op::Add + } +} + +// crc (4 bytes) | timestamp (8 bytes) | key_size (4 bytes) | value_size (8 bytes) | op (1 bytes) | key | value +#[derive(Default)] +#[allow(dead_code)] +pub struct Entry { + pub crc: u32, + pub timestamp: u64, + pub key_size: u32, + pub value_size: u64, + op: Op, + pub key: Vec, + pub value: Vec, +} + +pub struct EntryParseResult { + pub value_pos: u64, + pub entry: Entry, +} + +pub struct EntryFile(File); +impl EntryFile { + pub fn new(file: File) -> EntryFile { + EntryFile(file) + } +} + +impl Into> for EntryFile { + fn into(mut self) -> Vec { + // crc (4 bytes) | timestamp (8 bytes) | key_size (4 bytes) | value_size (8 bytes) | op (1 bytes) | key | value + let header_size = Entry::default().header_size(); + let mut buffer = vec![0; header_size]; + + let _ = self.0.seek(SeekFrom::Start(0)); + + let mut offset: usize = 0; + let mut results = Vec::new(); + loop { + // crc = 4 | time = 8 | ksize = 4 | vsize = 8 | op = 1 + match self.0.read_exact(&mut buffer) { + Ok(()) => { + // 读取成功,处理 buffer 的内容 + //debug!("Successfully read {} bytes", buffer.len()); + // 处理 buffer 内容的其他操作 + } + Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => { + // 文件结束EOF,处理不完整数据 + // debug!("Reached end of file"); + break; // 退出循环 + } + Err(e) => { + // 处理其他 I/O 错误 + debug!("Error reading file: {}", e); + break; // 退出循环 + } + } + + let crc = u32::from_le_bytes( + buffer[..4] + .try_into() + .map_err(|_| "Invalid CRC data") + .unwrap(), + ); + let timestamp = u64::from_le_bytes( + buffer[4..12] + .try_into() + .map_err(|_| "Invalid timestamp data") + .unwrap(), + ); + let key_size = u32::from_le_bytes( + buffer[12..16] + .try_into() + .map_err(|_| "Invalid key_size data") + .unwrap(), + ); + let value_size = u64::from_le_bytes( + buffer[16..24] + .try_into() + .map_err(|_| "Invalid value_size data") + .unwrap(), + ); + + let op = Op::from_le_bytes( + buffer[24..25] + .try_into() + .map_err(|_| "Invalid value_size data") + .unwrap(), + ) + .unwrap(); + + // key + let mut key = vec![0; key_size as usize]; + self.0.read_exact(&mut key).unwrap(); + + // value + let mut value = vec![0; value_size as usize]; + self.0.read_exact(&mut value).unwrap(); + + let entry = Entry { + crc, + timestamp, + key_size, + value_size, + op, + key, + value, + }; + + results.push(EntryParseResult { + value_pos: offset as u64, + entry: entry, + }); + offset += header_size + key_size as usize + value_size as usize; + } + + results + } +} + +impl fmt::Debug for Entry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Entry {{ crc: {}, timestamp: {}, key_size: {}, value_size: {}, op:{}, key: {:?}, value: {:?} }}", + self.crc, + self.timestamp, + self.key_size, + self.value_size, + self.op, + util::format_bytes_as_str(&self.key), + util:: format_bytes_as_str(&self.value) + ) + } +} + +#[allow(dead_code)] +impl Entry { + pub fn new(key: Vec, value: Vec, timestamp: u64) -> Entry { + // 将 bytes 转为 entry + let key_size = key.len() as u32; + let value_size = value.len() as u64; + // let timestamp = Utc::now().timestamp() as u64; + let op = Op::Add; + // let timestamp = 0; + + let mut entry = Entry { + crc: 0, + timestamp, + key_size, + value_size, + op, + key, + value, + }; + entry.crc = entry.calculate_crc(); + + entry + } + + pub fn with_timestamp(mut self, ts: u64) -> Entry { + self.timestamp = ts; + self.refresh_crc(); + self + } + + pub fn set_updated(mut self) -> Entry { + self.op = Op::Put; + self.refresh_crc(); + self + } + pub fn set_removed(mut self) -> Entry { + self.op = Op::Del; + self.refresh_crc(); + self + } + + fn refresh_crc(&mut self) { + self.crc = self.calculate_crc(); + } + + fn calculate_crc(&self) -> u32 { + let mut hasher = Hasher::new(); + hasher.update(&self.timestamp.to_le_bytes()); + hasher.update(&self.key_size.to_le_bytes()); + hasher.update(&self.value_size.to_le_bytes()); + hasher.update(&self.op.to_le_bytes()); + hasher.update(&self.key); + hasher.update(&self.value); + + hasher.finalize() + } + + pub fn is_valid(&self) -> bool { + self.crc == self.calculate_crc() + } + + pub fn is_expired(&self) -> bool { + if self.timestamp == 0 { + return false; + } + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH") + .as_millis() as u64; + + now > self.timestamp + } + + pub fn is_removed(&self) -> bool { + match self.op { + Op::Del => true, + _ => false, + } + } + + // crc (4 bytes) | timestamp (8 bytes) | key_size (4 bytes) | value_size (8 bytes) | op (1 bytes) | key | value + pub fn header_size(&self) -> usize { + let header_size: usize = 2 * std::mem::size_of::() + + 2 * std::mem::size_of::() + + std::mem::size_of::(); + header_size + } + + pub fn size(&self) -> usize { + let total_size: usize = self.header_size() + self.key.len() + self.value.len(); + + total_size + } + + pub fn get_value(&self) -> &Vec { + &self.value + } + + pub fn as_bytes(&self) -> Vec { + let mut result: Vec = Vec::new(); + + // 将各个字段的字节添加到结果中 + result.extend_from_slice(&self.crc.to_le_bytes()); + result.extend_from_slice(&self.timestamp.to_le_bytes()); + result.extend_from_slice(&self.key_size.to_le_bytes()); + result.extend_from_slice(&self.value_size.to_le_bytes()); + result.extend_from_slice(&self.op.to_le_bytes()); + result.extend_from_slice(&self.key); + result.extend_from_slice(&self.value); + + result + } +} + +// TryFrom trait +impl TryFrom> for Entry { + type Error = String; + + fn try_from(bytes: Vec) -> Result { + // crc (4 bytes) | timestamp (8 bytes) | key_size (4 bytes) | value_size (8 bytes) | op (1 bytes) | key | value + let header_size = Entry::default().header_size(); + if bytes.len() < header_size { + // 除去 key 和 value 之外的固定长度 + return Err("Input Vec is too short".into()); + } + + let crc = u32::from_le_bytes(bytes[..4].try_into().map_err(|_| "Invalid CRC data")?); + let timestamp = u64::from_le_bytes( + bytes[4..12] + .try_into() + .map_err(|_| "Invalid timestamp data")?, + ); + let key_size = u32::from_le_bytes( + bytes[12..16] + .try_into() + .map_err(|_| "Invalid key_size data")?, + ); + let value_size = u64::from_le_bytes( + bytes[16..24] + .try_into() + .map_err(|_| "Invalid value_size data")?, + ); + let op = Op::from_le_bytes(bytes[24..25].try_into().map_err(|_| "Invalid op data")?)?; + + if bytes.len() < (header_size + key_size as usize + value_size as usize) { + return Err("Input Vec is too short for the key and value".into()); + } + + let key = bytes[header_size..(header_size + key_size as usize)].to_vec(); + let value = bytes[(header_size + key_size as usize) + ..(header_size + key_size as usize + value_size as usize)] + .to_vec(); + + Ok(Entry { + crc, + timestamp, + key_size, + value_size, + op, + key, + value, + }) + } +} + +// Into trait +impl Into> for Entry { + fn into(self) -> Vec { + // 计算总容量(4 字段的长度 + key 和 value 的长度) + let total_size = Entry::default().size(); + println!("inro {:?}", self.op.to_le_bytes()); + println!("inro2 {:?}", self.key_size.to_le_bytes()); + + let mut result = Vec::with_capacity(total_size); + result.extend_from_slice(&self.crc.to_le_bytes()); + result.extend_from_slice(&self.timestamp.to_le_bytes()); + result.extend_from_slice(&self.key_size.to_le_bytes()); + result.extend_from_slice(&self.value_size.to_le_bytes()); + result.extend_from_slice(&self.op.to_le_bytes()); + result.extend(self.key); + result.extend(self.value); + println!("{:?}", result); + result + } +} + +#[cfg(test)] +mod tests { + use super::Op; + use crate::entry::entry::Entry; + + #[test] + fn entry_new() { + let key = "foo"; + let value = "1"; + + let entry = Entry::new(key.as_bytes().to_vec(), value.as_bytes().to_vec(), 0); + assert_eq!(entry.key, key.as_bytes().to_vec()); + assert_eq!(entry.value, value.as_bytes().to_vec()); + + let crc = 12345; + assert_ne!(crc, entry.crc); + } + + #[test] + fn entry_into_bytes() { + let key1 = "name".as_bytes(); + let value1 = "li".as_bytes(); + + // create an entry + let entry = Entry::new(key1.to_vec(), value1.to_vec(), 0); + let e_key = &entry.key; + let e_value = &entry.value; + let e_key_size = entry.key_size as u32; + let e_value_size = entry.value_size as u64; + let e_timestamp = entry.timestamp; + let op = Op::Add; + assert_eq!(key1.to_vec(), *e_key); + assert_eq!(value1.to_vec(), *e_value); + assert_eq!(key1.len() as u32, e_key_size); + assert_eq!(value1.len() as u64, e_value_size); + assert_eq!(Op::Add, entry.op); + + // entry into + let bytes: Vec = entry.into(); + assert_ne!(0, bytes.len()); + + // entry try_from + let rentry = Entry::try_from(bytes); + assert!(rentry.is_ok()); + + let ins = rentry.unwrap(); + assert_eq!(key1.to_vec(), ins.key); + assert_eq!(value1.to_vec(), ins.value); + assert_eq!(e_key_size, ins.key_size); + assert_eq!(e_value_size, ins.value_size); + assert_eq!(e_timestamp, ins.timestamp); + assert_eq!(op, ins.op); + assert!(ins.is_valid()); + } +} diff --git a/src/entry/hint.rs b/src/entry/hint.rs new file mode 100644 index 0000000..9e49e77 --- /dev/null +++ b/src/entry/hint.rs @@ -0,0 +1,179 @@ +use std::fmt; +use std::fs::File; +use std::io::{self, Read}; + +#[derive(Default)] +pub struct Hint { + pub timestamp: u64, + pub key_size: u32, + pub value_size: u64, // entry 大小 + pub value_pos: u64, // entry pos, 再加上大小,就可以快速定位到整个entry + pub key: Vec, +} + +// Hint => bytes +impl Into> for Hint { + fn into(self) -> Vec { + let mut result: Vec = Vec::with_capacity(28 + self.key_size as usize); + + // 将各个字段的字节添加到结果中 + result.extend_from_slice(&self.timestamp.to_le_bytes()); + result.extend_from_slice(&self.key_size.to_le_bytes()); + result.extend_from_slice(&self.value_size.to_le_bytes()); + result.extend_from_slice(&self.value_pos.to_le_bytes()); + result.extend_from_slice(&self.key); + + result + } +} + +// | timestamp (8 bytes) | key_size (4 bytes) | value_size (8 bytes) | value_pos (8 bytes) | key | +pub struct HintFile(File); + +impl HintFile { + pub fn new(file: File) -> HintFile { + HintFile(file) + } +} + +fn format_bytes_as_str(bytes: &[u8]) -> String { + match std::str::from_utf8(bytes) { + Ok(s) => s.to_string(), + Err(_) => "[invalid UTF-8]".to_string(), + } +} +impl fmt::Debug for Hint { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Hint {{ timestamp: {}, key_size: {}, value_size: {}, value_pos: {:?}, value: {:?} }}", + self.timestamp, + self.key_size, + self.value_size, + self.value_pos, + format_bytes_as_str(&self.key), + ) + } +} + +impl Into> for HintFile { + fn into(mut self) -> Vec { + let mut buffer = vec![0; 28]; + let mut hits_result = Vec::new(); + loop { + // crc = 4 | time = 8 | ksize = 4 | vsize = 8 + match self.0.read_exact(&mut buffer) { + Ok(()) => { + // 读取成功,处理 buffer 的内容 + // println!("Successfully read {} bytes", buffer.len()); + // 处理 buffer 内容的其他操作 + } + Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => { + // 文件结束EOF,处理不完整数据 + println!("Reached end of file"); + break; // 退出循环 + } + Err(e) => { + // 处理其他 I/O 错误 + eprintln!("Error reading file: {}", e); + break; // 退出循环 + } + } + + let timestamp = u64::from_le_bytes( + buffer[0..8] + .try_into() + .map_err(|_| "Invalid timestamp data") + .unwrap(), + ); + let key_size = u32::from_le_bytes( + buffer[8..12] + .try_into() + .map_err(|_| "Invalid key_size data") + .unwrap(), + ); + let value_size = u64::from_le_bytes( + buffer[12..20] + .try_into() + .map_err(|_| "Invalid value_size data") + .unwrap(), + ); + + // value_pos + let value_pos = u64::from_le_bytes( + buffer[20..28] + .try_into() + .map_err(|_| "Invalid value_size data") + .unwrap(), + ); + + // key 再次读取指定长度的字节 + let mut key = vec![0; key_size as usize]; + self.0.read_exact(&mut key).unwrap(); + + hits_result.push(Hint { + timestamp, + key_size, + value_size, + value_pos, + key, + }) + } + + hits_result + } +} + +#[cfg(test)] +pub mod tests { + use crate::entry::entry::Entry; + use std::io::Write; + use tempfile::NamedTempFile; + + #[test] + fn hit_write_and_read() -> anyhow::Result<()> { + use crate::entry::hint::{Hint, HintFile}; + + let mut tmpfile = NamedTempFile::new()?; + + let mut offset = 0; + for num in 1..=2 { + let key: Vec = num.to_string().into_bytes(); + let value: Vec = (num * 2).to_string().into_bytes(); + + // let key = "a".as_bytes().to_vec(); + // let value = "1".as_bytes().to_vec(); + let entry = Entry::new(key.clone(), value, 0); + + let ht = Hint { + timestamp: entry.timestamp, + key_size: entry.key_size, + value_pos: offset, + value_size: entry.size() as u64, + key: key.clone(), + }; + offset += entry.size() as u64; + + let config_content: Vec = ht.into(); + tmpfile.write_all(&config_content)?; + } + + // let mut tmpfile = NamedTempFile::new()?; + // let config_content: Vec = ht.into(); + // tmpfile.write_all(&config_content)?; + + let f = std::fs::File::open(tmpfile.path().to_str().unwrap())?; + let the_file = HintFile::new(f); + let result: Vec = HintFile::into(the_file); + assert_eq!(2, result.len()); + + let mut offset: u64 = 0; + for item in result { + assert_eq!(offset, item.value_pos); + offset += item.value_size; + println!("{:?}", item); + } + + Ok(()) + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..a91e735 --- /dev/null +++ b/src/error.rs @@ -0,0 +1 @@ +pub mod error; diff --git a/src/error/error.rs b/src/error/error.rs new file mode 100644 index 0000000..118c46d --- /dev/null +++ b/src/error/error.rs @@ -0,0 +1,15 @@ +use std::fmt; + +#[derive(Debug)] +pub enum OpError { + KeyNotFound, + ReadSizeNotMatch, + ValueInvalid, + LockFailed +} + +impl fmt::Display for OpError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..33ac4fb --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,10 @@ +pub mod cli; +pub mod config; +pub mod entry; +mod error; +pub mod store; +pub mod util; + +pub use crate::error::error::*; +pub use crate::store::store as db_store; +pub mod server; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..6aca696 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,21 @@ +use log::debug; +use minkv::cli; +use serde::{Deserialize, Serialize}; + +// 派生 Serialize 和 Deserialize +#[derive(Serialize, Deserialize)] +struct Record { + id: u32, + name: String, + age: u8, +} + +fn main() { + match cli::cli::parse() { + Ok(_) => {} + Err(e) => { + debug!("{:?}", e); + println!("{:?}", e) + } + } +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..fb17aef --- /dev/null +++ b/src/server.rs @@ -0,0 +1,873 @@ +use super::config; +use crate::db_store; +use crate::util; +use log::*; +use redis_protocol::resp2::{ + decode::decode, + encode::encode, + types::{OwnedFrame, Resp2Frame}, +}; +use std::net::TcpListener; +use std::path::PathBuf; +use std::sync::mpsc; +use std::sync::Arc; +use std::{ + io::{Read, Write}, + sync::RwLock, +}; + +pub struct Server { + config: config::Config, + store: Arc>, +} + +impl Server { + pub fn new(config: config::Config) -> Server { + let (tx, rx) = mpsc::channel(); + let srv_config = config.clone(); + let config = Arc::new(config); + let store = db_store::new_store(config, tx, rx); + + Server { + config: srv_config, + store: Arc::new(RwLock::new(store)), + } + } + + fn handle_client_connection(&self, mut stream: std::net::TcpStream) { + let mut buffer = [0u8; 4096]; + + loop { + // 读取客户端发送的数据 + let n = match stream.read(&mut buffer) { + Ok(size) => size, + Err(e) => { + error!("Failed to read from stream: {}", e); + return; + } + }; + + if n == 0 { + break; // 连接已关闭 + } + + // 解析 RESP 帧 + match decode(&buffer[..n]) { + Ok(Some((frame, _))) => match self.handle_frame(&frame) { + Ok(resp) => { + let mut buf = vec![0; resp.encode_len()]; + encode(&mut buf, &resp).unwrap(); + stream.write_all(&buf).unwrap(); + } + Err(e) => { + let error_message = format!("-ERR {}\r\n", e); + stream.write_all(error_message.as_bytes()).unwrap(); + } + }, + Ok(None) => { + // 数据不完整,需要更多数据 + error!("数据不完整"); + continue; + } + Err(e) => { + error!("Error decoding frame: {}", e); + return; + } + } + } + } + + fn get_command_name(&self, frame: &OwnedFrame) -> Option { + if let OwnedFrame::Array(arr) = frame { + if let Some(OwnedFrame::BulkString(bulk)) = arr.get(0) { + if let Ok(command) = std::str::from_utf8(bulk) { + return Some(command.to_string()); + } + } + } + None + } + + fn handle_frame(&self, frame: &OwnedFrame) -> Result { + let command = match self.get_command_name(frame) { + Some(cmd) => cmd, + _ => return Err("(error) ERR unknown command".to_string()), + }; + + debug!("Received command: {}", command); + + match command.to_uppercase().as_str() { + "SET" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 3 { + return Err("(error) ERR syntax error".to_string()); + } + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid SET command format".to_string()), + }; + let value = match &arr[2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid SET command format".to_string()), + }; + + let mut store = self.store.write().unwrap(); + store.set(key, value, 0); + + Ok(OwnedFrame::SimpleString(b"OK".to_vec())) + } else { + Err("Invalid SET command format".to_string()) + } + } + "GET" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 2 { + return Err( + "(error) ERR wrong number of arguments for 'get' command".to_string() + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid GET command format".to_string()), + }; + + let store = self.store.read().unwrap(); + match store.get(key) { + Ok(val) => Ok(OwnedFrame::BulkString(val)), + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Null) + } + } + } else { + Err("Invalid GET command format".to_string()) + } + } + "DEL" => { + if let OwnedFrame::Array(arr) = frame { + // 检查参数个数,SET 命令应该有三个参数: SET, key, value + if arr.len() != 2 { + return Err("Invalid SET command format: expected 2 arguments".to_string()); + } + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid SET command format".to_string()), + }; + + let mut store = self.store.write().unwrap(); + store.delete(key); + // store.set(key.clone(), value.clone()); + Ok(OwnedFrame::SimpleString(b"OK".to_vec())) + } else { + Err("Invalid SET command format".to_string()) + } + } + "EXISTS" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() < 2 { + return Err("(error) ERR wrong number of arguments for 'exists' command" + .to_string()); + } + + let mut count: i64 = 0; + + let store = self.store.read().unwrap(); + for key in &arr[1..] { + match key { + OwnedFrame::BulkString(bulk) => { + // 处理 BulkString 变体 + match store.get(bulk) { + Ok(_) => count = count + 1, + Err(_) => {} + }; + } + // 可以匹配其他变体 + _ => {} + } + } + + Ok(OwnedFrame::Integer(count)) + } else { + Err("Invalid EXISTS command format".to_string()) + } + } + "GETSET" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 3 { + return Err("(error) ERR wrong number of arguments for 'getset' command" + .to_string()); + } + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid SET command format".to_string()), + }; + let value = match &arr[2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid GETSET command format".to_string()), + }; + + let mut store = self.store.write().unwrap(); + let old_value = match store.get(key) { + Ok(val) => Ok(OwnedFrame::BulkString(val)), + Err(_) => Ok(OwnedFrame::Null), + }; + store.set(key, value, 0); + + old_value + } else { + Err("Invalid GETSET command format".to_string()) + } + } + "MSET" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() < 2 || arr.len() % 2 != 1 { + return Err( + "(error) ERR wrong number of arguments for 'mset' command".to_string() + ); + } + // OwnedFrame::BulkString(val) + let mut store = self.store.write().unwrap(); + for i in 0..arr.len() / 2 { + let key = match &arr[i * 2 + 1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid SET command format".to_string()), + }; + + let value = match &arr[i * 2 + 2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid SET command format".to_string()), + }; + + // set + store.set(key, value, 0); + } + + Ok(OwnedFrame::SimpleString(b"OK".to_vec())) + } else { + Err("Invalid MSET command format".to_string()) + } + } + "MGET" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() < 2 { + return Err( + "(error) ERR wrong number of arguments for 'mget' command".to_string() + ); + } + // OwnedFrame::BulkString(val) + let mut result = Vec::new(); + + let store = self.store.read().unwrap(); + for key in &arr[1..] { + match key { + OwnedFrame::BulkString(bulk) => { + // 处理 BulkString 变体 + match store.get(bulk) { + Ok(value) => result.push(OwnedFrame::BulkString(value)), + Err(_) => result.push(OwnedFrame::Null), + }; + } + _ => {} + } + } + + Ok(OwnedFrame::Array(result)) + } else { + Err("Invalid MGET command format".to_string()) + } + } + "APPEND" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 3 { + return Err("(error) ERR syntax error".to_string()); + } + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid APPEND command format".to_string()), + }; + let value = match &arr[2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid APPEND command format".to_string()), + }; + + let mut store = self.store.write().unwrap(); + match store.get(key) { + Ok(mut val) => { + val.extend(value); + store.set(key, &val, 0); + Ok(OwnedFrame::Integer(val.len() as i64)) + } + Err(e) => { + debug!("get value occur error {:?}", e); + store.set(key, value, 0); + Ok(OwnedFrame::Integer(value.len() as i64)) + } + } + } else { + Err("Invalid APPEND command format".to_string()) + } + } + "INCR" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 2 { + return Err( + "(error) ERR wrong number of arguments for 'incr' command".to_string() + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid INCR command format".to_string()), + }; + + let mut store = self.store.write().unwrap(); + match store.get(key) { + Ok(val) => { + // convert to a number + let s = String::from_utf8_lossy(&val); + match s.parse::() { + Ok(mut n) => { + n = n + 1; + store.set(key, &n.to_string().into_bytes(), 0); + Ok(OwnedFrame::Integer(n)) + } + Err(_) => Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range" + .to_string(), + )), + } + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Null) + } + } + } else { + Err("Invalid INCR command format".to_string()) + } + } + "DECR" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 2 { + return Err( + "(error) ERR wrong number of arguments for 'incr' command".to_string() + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid DECR command format".to_string()), + }; + + let mut store = self.store.write().unwrap(); + match store.get(key) { + Ok(val) => { + // convert to a number + let s = String::from_utf8_lossy(&val); + match s.parse::() { + Ok(mut n) => { + n = n - 1; + store.set(key, &n.to_string().into_bytes(), 0); + Ok(OwnedFrame::Integer(n)) + } + Err(_) => Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range" + .to_string(), + )), + } + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Null) + } + } + } else { + Err("Invalid DECR command format".to_string()) + } + } + "INCRBY" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 3 { + return Err("(error) ERR wrong number of arguments for 'incrby' command" + .to_string()); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid INCRBY command format".to_string()), + }; + + let num_str = match &arr[2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid INCRBY command format".to_string()), + }; + + let s = String::from_utf8_lossy(&num_str); + let value = match s.parse::() { + Ok(value) => value, + Err(_) => { + return Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range".to_string(), + )) + } + }; + + let mut store = self.store.write().unwrap(); + match store.get(key) { + Ok(val) => { + // convert to a number + let s = String::from_utf8_lossy(&val); + match s.parse::() { + Ok(mut n) => { + n = n + value; + store.set(key, &n.to_string().into_bytes(), 0); + Ok(OwnedFrame::Integer(n)) + } + Err(_) => Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range" + .to_string(), + )), + } + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Null) + } + } + } else { + Err("Invalid INCRBY command format".to_string()) + } + } + "DECRBY" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 3 { + return Err("(error) ERR wrong number of arguments for 'decrby' command" + .to_string()); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid DECRBY command format".to_string()), + }; + + let num_str = match &arr[2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid DECRBY command format".to_string()), + }; + + let s = String::from_utf8_lossy(&num_str); + let value = match s.parse::() { + Ok(value) => value, + Err(_) => { + return Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range".to_string(), + )) + } + }; + + let mut store = self.store.write().unwrap(); + match store.get(key) { + Ok(val) => { + // convert to a number + let s = String::from_utf8_lossy(&val); + match s.parse::() { + Ok(mut n) => { + n = n - value; + store.set(key, &n.to_string().into_bytes(), 0); + Ok(OwnedFrame::Integer(n)) + } + Err(_) => Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range" + .to_string(), + )), + } + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Null) + } + } + } else { + Err("Invalid DECRBY command format".to_string()) + } + } + "EXPIRE" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 3 { + return Err("(error) ERR wrong number of arguments for 'expire' command" + .to_string()); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid EXPIRE command format".to_string()), + }; + + let num_str = match &arr[2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid EXPIRE command format".to_string()), + }; + + let s = String::from_utf8_lossy(&num_str); + let value = match s.parse::() { + Ok(value) => value, + Err(_) => { + return Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range".to_string(), + )) + } + }; + if value > u64::MAX / 1000 { + return Ok(OwnedFrame::Error( + "(error) ERR invalid expire time in 'expire' command".to_string(), + )); + } + + let millisec = util::time::get_millisec_from_sec(value); + let mut store = self.store.write().unwrap(); + match store.get(key) { + Ok(val) => { + store.set(key, &val, millisec); + Ok(OwnedFrame::Integer(1)) + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Null) + } + } + } else { + Err("Invalid EXPIRE command format".to_string()) + } + } + "EXPIREAT" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 3 { + return Err( + "(error) ERR wrong number of arguments for 'expireat' command" + .to_string(), + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid EXPIREAT command format".to_string()), + }; + + let num_str = match &arr[2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid EXPIREAT command format".to_string()), + }; + + let s = String::from_utf8_lossy(&num_str); + let value = match s.parse::() { + Ok(value) => value, + Err(_) => { + return Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range".to_string(), + )) + } + }; + if value > u64::MAX / 1000 { + return Ok(OwnedFrame::Error( + "(error) ERR invalid expire time in 'expireat' command".to_string(), + )); + } + + let millisec = util::time::sec_to_millisec(value); + let mut store = self.store.write().unwrap(); + match store.get(key) { + Ok(val) => { + store.set(key, &val, millisec); + Ok(OwnedFrame::Integer(1)) + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Null) + } + } + } else { + Err("Invalid EXPIREAT command format".to_string()) + } + } + "PEXPIRE" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 3 { + return Err( + "(error) ERR wrong number of arguments for 'pexpire' command" + .to_string(), + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid PEXPIRE command format".to_string()), + }; + + let num_str = match &arr[2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid PEXPIRE command format".to_string()), + }; + + let s = String::from_utf8_lossy(&num_str); + let value = match s.parse::() { + Ok(value) => value, + Err(_) => { + return Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range".to_string(), + )) + } + }; + if util::time::current_milliseconds() + value > u64::MAX / 1000 { + return Ok(OwnedFrame::Error( + "(error) ERR invalid expire time in 'pexpire' command".to_string(), + )); + } + + let millisec = util::time::get_millisec(value); + let mut store = self.store.write().unwrap(); + match store.get(key) { + Ok(val) => { + store.set(key, &val, millisec); + Ok(OwnedFrame::Integer(1)) + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Null) + } + } + } else { + Err("Invalid PEXPIRE command format".to_string()) + } + } + "PEXPIREAT" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 3 { + return Err( + "(error) ERR wrong number of arguments for 'pexpireat' command" + .to_string(), + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid PEXPIREAT command format".to_string()), + }; + + let num_str = match &arr[2] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid PEXPIREAT command format".to_string()), + }; + + let s = String::from_utf8_lossy(&num_str); + let value = match s.parse::() { + Ok(value) => value, + Err(_) => { + return Ok(OwnedFrame::Error( + "(error) ERR value is not an integer or out of range".to_string(), + )) + } + }; + if value > u64::MAX / 1000 { + return Ok(OwnedFrame::Error( + "(error) ERR invalid expire time in 'pexpireat' command".to_string(), + )); + } + + let mut store = self.store.write().unwrap(); + match store.get(key) { + Ok(val) => { + store.set(key, &val, value); + Ok(OwnedFrame::Integer(1)) + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Null) + } + } + } else { + Err("Invalid PEXPIREAT command format".to_string()) + } + } + "TTL" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 2 { + return Err( + "(error) ERR wrong number of arguments for 'ttl' command".to_string() + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid TTL command format".to_string()), + }; + + let store = self.store.write().unwrap(); + match store.get_entry(key) { + Ok(entry) => { + if entry.timestamp == 0 { + // 未设置过期时间 + Ok(OwnedFrame::Integer(-1)) + } else { + if entry.is_expired() { + Ok(OwnedFrame::Null) + } else { + Ok(OwnedFrame::Integer(util::time::get_lifetime_sec( + entry.timestamp, + ) + as i64)) + } + } + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Integer(-2)) + } + } + } else { + Err("Invalid TTL command format".to_string()) + } + } + "PTTL" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 2 { + return Err( + "(error) ERR wrong number of arguments for 'pttl' command".to_string() + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid PTTL command format".to_string()), + }; + + let store = self.store.write().unwrap(); + match store.get_entry(key) { + Ok(entry) => { + if entry.timestamp == 0 { + // 未设置过期时间 + Ok(OwnedFrame::Integer(-1)) + } else { + if entry.is_expired() { + Ok(OwnedFrame::Null) + } else { + Ok(OwnedFrame::Integer(util::time::get_lifetime_millisec( + entry.timestamp, + ) + as i64)) + } + } + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Integer(-2)) + } + } + } else { + Err("Invalid PTTL command format".to_string()) + } + } + "PERSIST" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 2 { + return Err( + "(error) ERR wrong number of arguments for 'persist' command" + .to_string(), + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid PERSIST command format".to_string()), + }; + + let mut store = self.store.write().unwrap(); + match store.get_entry(key) { + Ok(entry) => { + if entry.timestamp == 0 { + // 未设置过期时间 + Ok(OwnedFrame::Integer(0)) + } else { + store.set(key, &entry.value, 0); + Ok(OwnedFrame::Integer(1)) + } + } + Err(e) => { + debug!("get value occur error {:?}", e); + Ok(OwnedFrame::Integer(0)) + } + } + } else { + Err("Invalid PERSIST command format".to_string()) + } + } + "KEYS" => { + if let OwnedFrame::Array(arr) = frame { + if arr.len() != 2 { + return Err( + "(error) ERR wrong number of arguments for 'key' command".to_string() + ); + } + + let key = match &arr[1] { + OwnedFrame::BulkString(bulk) => bulk, + _ => return Err("Invalid KEYS command format".to_string()), + }; + + let k = String::from_utf8(key.to_vec()).unwrap(); + let store = self.store.read().unwrap(); + let mut result: Vec = Vec::new(); + for v in store.keys() { + let v1 = String::from_utf8(v.to_vec()).unwrap(); + if util::match_key(&k, &v1) { + result.push(OwnedFrame::BulkString(v.clone())); + } + } + return Ok(OwnedFrame::Array(result)); + } else { + Err("Invalid KEYS command format".to_string()) + } + } + // 其他命令处理 + _ => Err(format!("Unsupported command {:?}", command.as_str()).to_string()), + } + } + + pub fn server_start(self: Arc) { + let (address, port) = self.config.get_addr_port(); + let addr = format!("{}:{}", address, port); + let listener = TcpListener::bind(&addr).unwrap(); + + println!("Listening on {}", addr); + + for stream in listener.incoming() { + match stream { + Ok(stream) => { + debug!("New connection: {}", stream.peer_addr().unwrap()); + let server_clone = Arc::clone(&self); + + std::thread::spawn(move || { + server_clone.handle_client_connection(stream); + }); + } + Err(e) => { + error!("Error: {}", e); + } + } + } + } +} + +pub fn start_server(option: &Option) -> anyhow::Result<()> { + let conf = if let Some(file) = option { + config::Config::try_from(file.as_path())? + } else { + config::Config::new()? + }; + + debug!("{:?}", conf); + let srv = Server::new(conf); + let server = Arc::new(srv); + server.server_start(); + Ok(()) +} diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 0000000..9b9fd4d --- /dev/null +++ b/src/store.rs @@ -0,0 +1,2 @@ +pub mod file; +pub mod store; diff --git a/src/store/file.rs b/src/store/file.rs new file mode 100644 index 0000000..65f3d99 --- /dev/null +++ b/src/store/file.rs @@ -0,0 +1,73 @@ +use super::super::entry::entry; +use std::fs::{self, File}; +use std::io::{self, BufReader, Read, Seek, SeekFrom, Write}; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; + +// 读写 +pub fn new(filepath: &PathBuf) -> io::Result { + let file = fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .append(true) + .open(filepath); + + file +} + +pub fn new_writer(filepath: &PathBuf) -> io::Result> { + let f = fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .append(true) + .open(filepath)?; + + Ok(io::BufWriter::new(f)) +} + +// readonly +pub fn open(path: &PathBuf) -> io::Result { + File::open(path) +} + +pub fn open_reader(path: &PathBuf) -> io::Result> { + let f = open(path)?; + Ok(io::BufReader::new(f)) +} + +pub fn read(file: &Arc>, offset: u64, size: u64) -> io::Result> { + let mut buf = vec![0; size as usize]; + let mut file = file.write().unwrap(); + file.seek(SeekFrom::Start(offset))?; + let _ = file.read_exact(&mut buf)?; + + Ok(buf) +} + +pub fn read_reader( + file: &Arc>>, + offset: u64, + size: u64, +) -> io::Result> { + let mut buf = vec![0; size as usize]; + let mut file = file.write().unwrap(); + file.seek(SeekFrom::Start(offset))?; + let _ = file.read_exact(&mut buf)?; + + Ok(buf) +} + +// return entry's position and size +pub fn append(file: &Arc>, e: entry::Entry) -> (u64, u64) { + let mut file = file.write().unwrap(); + // let mut file = file.borrow_mut(); + let pos = file.seek(SeekFrom::End(0)).unwrap(); + let size = e.size() as u64; + let buf = e.as_bytes(); + file.write_all(&buf).unwrap(); + // file.flush().unwrap(); + + (pos, size) +} diff --git a/src/store/store.rs b/src/store/store.rs new file mode 100644 index 0000000..dd22971 --- /dev/null +++ b/src/store/store.rs @@ -0,0 +1,1128 @@ +use super::file; +use crate::config::{self, Config}; +use crate::entry::entry::{self, Entry, EntryFile, EntryParseResult}; +use crate::entry::hint::{Hint, HintFile}; +use crate::util::lock; +use crate::OpError; +use chrono::Utc; +use log::*; +use std::collections::hash_map::Iter; +use std::collections::HashMap; +use std::fs::{self, File}; +use std::io::{BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc; +use std::sync::mpsc::Receiver; +use std::sync::{Arc, Mutex, RwLock}; + +pub trait Op: Send + Sync + 'static { + fn get(&self, key: &Vec) -> Result, OpError>; + fn get_entry(&self, key: &Vec) -> Result; + fn set(&mut self, key: &Vec, value: &Vec, timestamp: u64); + fn delete(&mut self, key: &Vec); + fn len(&self) -> usize; + fn keys(&self) -> Vec>; + fn compaction(&mut self); +} + +const ACTIVE_FILE_SEQ: u16 = 0; +type StFile = Arc>>; // storage File +type NotifyResult = i32; + +pub struct Store +where + // K: OpKeydir, + K: OpKeydir + Send + Sync + 'static, +{ + active_file: Arc>, + config: Arc, + keydir: Arc>, + files: Arc>>, + file_size: AtomicUsize, // 当前写入文件大小 + merge_file_num: AtomicUsize, + updated_key_num: AtomicUsize, // 更新key数量 + sender: mpsc::Sender, + receiver: Arc>>, +} + +pub fn new_store( + config: Arc, + sender: mpsc::Sender, + receiver: mpsc::Receiver, +) -> Store { + let keydir = Keydir::new(); + let mut s = Store::new(keydir, config, sender, receiver); + s.start(); + s +} + +fn get_active_data(filepath: PathBuf) -> Arc> { + let fd = file::new(&filepath).unwrap(); + Arc::new(RwLock::new(fd)) +} + +impl Store +where + K: OpKeydir + Send + Sync + 'static, +{ + pub fn new( + keydir: K, + conf: Arc, + sender: mpsc::Sender, + receiver: mpsc::Receiver, + ) -> Store { + // let conf = config::Config::new(); + let active_file = get_active_data(conf.get_active_filepath()); + let mut s = Store { + active_file, + config: conf, + keydir: Arc::new(RwLock::new(keydir)), + files: Arc::new(RwLock::new(HashMap::new())), + file_size: AtomicUsize::new(0), + merge_file_num: AtomicUsize::new(0), + updated_key_num: AtomicUsize::new(0), + sender, + receiver: Arc::new(Mutex::new(receiver)), + }; + s.notify(); + s + } + + fn get_filesize(&self) -> usize { + self.file_size.load(Ordering::SeqCst) + } + + // 从当前目录里读取相关文件,如果未找到任务数据文件 + pub fn start(&mut self) { + // 1. load all hits to keydir + self.load_hint_file(); + + // 2. load all datafiles + self.rebuild_from_datafile(); + self.load_active_file(); + } + + // merge + + fn rebuild_from_datafile(&mut self) { + let current_max_file_seq = self.config.get_next_datafile_seq(); + let mut keydir = Keydir::new(); + + for idx in 1..current_max_file_seq { + let mut count = 0; + + // read index from hint file + let the_file = self.config.get_filepath_by_seq(idx); + if !the_file.exists() { + debug!("data file {:?} not found!", the_file); + continue; + } else { + // 判断是否已通过 hint 文件合并 + let hint_filename = self.config.get_hint_filepath_by_seq(idx); + if hint_filename.exists() { + continue; + } + } + let file = file::open(&the_file).unwrap(); + let entry_file = EntryFile::new(file); + let entry_result: Vec = entry_file.into(); + + for entry in entry_result { + let metadata = Metadata { + file_id: idx, + value_sz: entry.entry.size() as u64, + value_pos: entry.value_pos, + tstamp: entry.entry.timestamp, + }; + + // debug!("{:?} {:?}", entry.entry, metadata); + // log replay + if entry.entry.is_expired() || entry.entry.is_removed() { + info!("expired or deleted {:?}", entry.entry); + keydir.remove(&entry.entry.key); + } else { + keydir.set(&entry.entry.key, metadata); + count += 1; + } + } + + // register datafile fd + let fd = file::open_reader(&the_file).unwrap(); + let mut file = self.files.write().unwrap(); + file.insert(idx, Arc::new(RwLock::new(fd))); + + debug!("加载序号 {} datafile 文件,共找到条目 {}", idx, count); + } + + debug!("共加载数据文件条目 {:?}", keydir.data.len()); + + // merge all archived datafile to keydir + let mut keydir_global = self.keydir.write().unwrap(); + keydir_global.extend(keydir.data); + } + fn load_hint_file(&mut self) { + let current_max_file_seq = self.config.get_next_datafile_seq(); + let mut total: usize = 0; + for idx in 1..current_max_file_seq { + // read index from hint file + let hint_filename = self.config.get_hint_filepath_by_seq(idx); + let archived_filename = self.config.get_filepath_by_seq(idx); + if !hint_filename.exists() || !archived_filename.exists() { + debug!("hits file {:?} not found!", hint_filename); + continue; + } + let hint_file_path = file::open(&hint_filename).unwrap(); + + let the_file = HintFile::new(hint_file_path); + let result: Vec = HintFile::into(the_file); + let len = result.len(); + + for hint in result { + let metadata = Metadata { + file_id: idx, + value_sz: hint.value_size, + value_pos: hint.value_pos, + tstamp: hint.timestamp, + }; + debug!("索引加载hint: {:?}", hint); + debug!("{:?}", metadata); + + // update keydir + let mut keydir = self.keydir.write().unwrap(); + keydir.set(&hint.key, metadata); + } + + debug!("加载序号 {} hint文件,共计 {}", current_max_file_seq, len); + + // register datafile fd + let fd = file::open_reader(&archived_filename).unwrap(); + let mut file = self.files.write().unwrap(); + file.insert(idx, Arc::new(RwLock::new(fd))); + + total += len; + } + + debug!("从hint文件加载共计 {} 项", total); + } + + fn load_active_file(&mut self) { + // 读取磁盘 active file, 主要实现从 data 文件实现索引重建 + let op_file = Arc::clone(&self.active_file); + let active_file = op_file.write().unwrap(); + let the_file = active_file.try_clone().unwrap(); + let entry_file = EntryFile::new(the_file); + let entry_result: Vec = entry_file.into(); + + // 临时 keydir + let mut keydir = Keydir::new(); + for entry in entry_result { + let metadata = Metadata { + file_id: ACTIVE_FILE_SEQ, + value_sz: entry.entry.size() as u64, + value_pos: entry.value_pos, + tstamp: entry.entry.timestamp, + }; + + // debug!("{:?} {:?}", entry.entry, metadata); + // log replay + if entry.entry.is_expired() || entry.entry.is_removed() { + debug!("expired or deleted {:?}", entry.entry); + keydir.remove(&entry.entry.key); + } else { + keydir.set(&entry.entry.key, metadata); + } + } + + debug!("found {} items from active file", keydir.len()); + + // merge temp keydir + let mut self_keydir = self.keydir.write().unwrap(); + self_keydir.extend(keydir.data); + } + + fn get_fd(&self, seq: u16) -> Result<(Option>>, Option), OpError> { + if seq == ACTIVE_FILE_SEQ { + return Ok((Some(Arc::clone(&self.active_file)), None)); + } + + // match self.files.get(&seq) + let files = self.files.read().unwrap(); + match files.get(&seq) { + Some(v) => Ok((None, Some(Arc::clone(v)))), + None => { + debug!("unregister fd for seq {}", seq); + Err(OpError::KeyNotFound) + } + } + } + + fn active_file_archive(&mut self) { + // 归档文件 + self.archive_file(); + + // merge archived datafiles in a new thread + self.merge_file_num.fetch_add(1, Ordering::SeqCst); + debug!( + "{} : {}", + self.merge_file_num.load(Ordering::SeqCst), + self.config.get_merge_file_num() + ); + if self.merge_file_num.load(Ordering::SeqCst) >= self.config.get_merge_file_num() { + self.merge_file_num.store(0, Ordering::SeqCst); + + // Avoiding duplicate merges caused by generating new files too quickly + if let Ok(_) = lock::Locker::acquire() { + self.sender.send(0).unwrap(); + } else { + debug!("发现 lock 文件,正在合并中..."); + } + } + } + + // rename active file to datafiles + fn archive_file(&mut self) { + let active_filepath = self.config.get_active_filepath(); + + let archive_file_seq = self.config.get_next_datafile_seq(); + let archive_filepath = self.config.get_filepath_by_seq(archive_file_seq); + + // write lock and flush buffer body to disk + { + let mut files = self.files.write().unwrap(); + // 1. rename active file name to archive file + let mut fd = self.active_file.write().unwrap(); + fd.flush().unwrap(); + + std::fs::rename(&active_filepath, &archive_filepath).unwrap(); + + // 2. reopen the file in read-only mode + let file = file::open_reader(&archive_filepath).unwrap(); + files.insert(archive_file_seq, Arc::new(RwLock::new(file))); + + // update keydir + let mut keydir = self.keydir.write().unwrap(); + keydir.update_key(archive_file_seq); + + debug!("archive active file => {:?}", archive_filepath); + } + + // renew active file + let archive_file_fd = file::new(&active_filepath).unwrap(); + self.active_file = Arc::new(RwLock::new(archive_file_fd)); + debug!("renew active file {:?}", active_filepath); + } + + fn notify(&mut self) { + let active_file = Arc::clone(&self.active_file); + let files = Arc::clone(&self.files); + let config = Arc::clone(&self.config); + // let self = &self.clone(); + let receiver: Arc>> = Arc::clone(&self.receiver); + let keydir = Arc::clone(&self.keydir); + + std::thread::spawn(move || { + fn get_fd( + active_file: Arc>, + files: Arc>>, + seq: u16, + ) -> Result<(Option>>, Option), OpError> { + if seq == ACTIVE_FILE_SEQ { + return Ok((Some(active_file), None)); + } + + let files = files.read().unwrap(); + match files.get(&seq) { + Some(v) => Ok((None, Some(Arc::clone(v)))), + None => { + debug!("unregister fd for seq {}", seq); + Err(OpError::KeyNotFound) + } + } + } + + for i in receiver.lock().unwrap().iter() { + println!("notify thread iter ======================= {:?}", i); + debug!("\n\n=== COMPACTION BEGIN ==="); + + let merge_dir = config.merge_dir(); + if merge_dir.exists() { + debug!("当前已处于工作状态 {:?}", merge_dir); + return; + } + // 创建临时合并目录 + fs::create_dir_all(merge_dir).unwrap(); + + let mut merge_file_seq: u16 = 1; + let mut merge_total_size = 0; + let mut merge_keydir = Keydir::new(); + + // merge file + let merge_filepath = config.get_merge_filepath_by_seq(merge_file_seq); + let mut merge_file_fd = file::new_writer(&merge_filepath).unwrap(); + debug!("create merge file: {:?}", merge_filepath); + + // hint file + let merge_hint_filepath = config.get_merge_hint_filepath_by_seq(merge_file_seq); + debug!("create merge hint file: {:?}", merge_hint_filepath); + let mut merge_hint_file_fd = file::new_writer(&merge_hint_filepath).unwrap(); + + let active_file_seq = config.get_next_datafile_seq(); + + // 新文件pos + let mut offset = 0; + + debug!("archive_file_seq= {:?}", active_file_seq); + for (key, metadata) in keydir + .read() + .unwrap() + .iter() + .filter(|(_, metadata)| metadata.file_id > 0) + { + let active_file = Arc::clone(&active_file); + let files = Arc::clone(&files); + let old_file_option = get_fd(active_file, files, metadata.file_id).unwrap(); + // 从原来的文件读取最新值 + let bytes_result = match old_file_option { + (None, Some(archive_file)) => { + file::read_reader(&archive_file, metadata.value_pos, metadata.value_sz) + } + _ => unreachable!(), // 理论上不可能到达这里 + }; + + match bytes_result { + Ok(bytes) => { + let body_size = bytes.len(); + // splits a new file + if merge_total_size + body_size > config.file_max_size() { + // 1. close current merge file fd + merge_file_fd.flush().unwrap(); + merge_hint_file_fd.flush().unwrap(); + + // 2.1 create a new merge file and hint file + merge_file_seq += 1; + let merge_filepath = + config.get_merge_filepath_by_seq(merge_file_seq); + merge_file_fd = file::new_writer(&merge_filepath).unwrap(); + debug!("[data]create merge file: {:?}", merge_filepath); + + let merge_hint_filepath = + config.get_merge_hint_filepath_by_seq(merge_file_seq); + merge_hint_file_fd = + file::new_writer(&merge_hint_filepath).unwrap(); + debug!("[hint]create merge hint file: {:?}", merge_hint_filepath); + + // 3. reset all variable for next iter + merge_total_size = 0; + offset = 0; + } + + // write merge file + merge_file_fd.write_all(bytes.as_slice()).unwrap(); + + // write hint file + let entry = match Entry::try_from(bytes.clone()) { + Ok(entry) => entry, + Err(e) => { + debug!("wrong {:?}", bytes); + debug!("metadata {:?}", metadata); + panic!("wrong {:?}", e) + } + }; + let hint_entry = Hint { + timestamp: metadata.tstamp, + key_size: entry.key_size, + value_size: entry.size() as u64, // 整个entry 大小, + value_pos: offset, // 整个entry的读取位置 + key: entry.key, + }; + debug!("write_hit {:?}", hint_entry); + let hint_bytes: Vec = hint_entry.into(); + merge_hint_file_fd.write_all(&hint_bytes).unwrap(); + + // update keydir index + let merge_metadata = Metadata { + file_id: merge_file_seq, // 新 file_seq + value_pos: offset, // 在新文件offset + value_sz: metadata.value_sz, // entry 本身大小不变 + tstamp: metadata.tstamp, + }; + + debug!("old_medatdata {:?}", metadata); + debug!("hit_metadata {:?}", merge_metadata); + + merge_keydir.data.insert(key.clone(), merge_metadata); + + // position for next write + offset += metadata.value_sz; + merge_total_size += body_size; + } + Err(_) => error!("err {:?}", metadata.file_id), + } + } + + // 刷新写盘 + merge_file_fd.flush().unwrap(); + merge_hint_file_fd.flush().unwrap(); + + // move + if merge_keydir.data.len() > 0 { + // delete old datafiles + let mut files = files.write().unwrap(); + for i in files.keys() { + if *i >= active_file_seq { + continue; + } + let delete_file = config.get_filepath_by_seq(*i); + debug!("deleted old data file: {:?}", delete_file); + fs::remove_file(delete_file).unwrap(); + } + // 加锁操作 + // rename merge files + for i in 1..(merge_file_seq + 1) { + // merge file + let from = config.get_merge_filepath_by_seq(i); + let to = config.get_filepath_by_seq(i); + let src_path = Path::new(&from); + let dst_path = Path::new(&to); + + debug!("{:?} => {:?} File moved successfully!", from, to); + std::fs::rename(src_path, dst_path).unwrap(); + + // hint file + { + let from = config.get_merge_hint_filepath_by_seq(i); + let to = config.get_hint_filepath_by_seq(i); + let src_path = Path::new(&from); + let dst_path = Path::new(&to); + + std::fs::rename(src_path, dst_path).unwrap(); + debug!("{:?} => {:?} File moved successfully!", from, to); + } + + // 重新打开所有文件句柄,并注册[file_id:fd] + let fd = file::open_reader(&dst_path.to_path_buf()).unwrap(); + files.insert(i, Arc::new(RwLock::new(fd))); + } + + // update keydir index + let mut keydir = keydir.write().unwrap(); + keydir.extend(merge_keydir.data); + } else { + // remove empty file + fs::remove_file(merge_filepath).unwrap(); + } + + // 删除临时合并文件 + config.merge_cleanup(); + + debug!("\n=== COMPACTION END ===\n"); + } + }); + } +} + +impl Op for Store { + // get + fn get(&self, key: &Vec) -> Result, OpError> { + let self_keydir = self.keydir.read().unwrap(); + if let Ok(metadata) = self_keydir.get(key) { + debug!("key:{:?} {:?}", key, metadata); + let op_file_option = self.get_fd(metadata.file_id).unwrap(); + + let bytes_result = match op_file_option { + (Some(active_file), None) => { + file::read(&active_file, metadata.value_pos, metadata.value_sz) + } + (None, Some(archive_file)) => { + file::read_reader(&archive_file, metadata.value_pos, metadata.value_sz) + } + _ => unreachable!(), // 理论上不可能到达这里 + }; + + // debug!( + // "file_id: {},value_pos: {}, value_sz: {}", + // metadata.file_id, metadata.value_pos, metadata.value_sz + // ); + // let bytes = file::read(&op_file, metadata.value_pos, metadata.value_sz); + match bytes_result { + Ok(bytes) => match Entry::try_from(bytes) { + Ok(entry) => { + debug!("{:?}", entry); + if !entry.is_valid() || entry.is_expired() || entry.is_removed() { + // remove item from key + // self.keydir.remove(key); + return Err(OpError::ValueInvalid); + } else { + return Ok(entry.value); + } + } + Err(e) => { + error!("parse Entry object failed! {:?}", e); + return Err(OpError::ValueInvalid); + } + }, + Err(e) => { + error!("read error: {:?}", e); + return Err(OpError::ValueInvalid); + } + } + } + Err(OpError::KeyNotFound) + } + + fn get_entry(&self, key: &Vec) -> Result { + let self_keydir = self.keydir.read().unwrap(); + if let Ok(metadata) = self_keydir.get(key) { + debug!("key:{:?} {:?}", key, metadata); + let op_file_option = self.get_fd(metadata.file_id).unwrap(); + + let bytes_result = match op_file_option { + (Some(active_file), None) => { + file::read(&active_file, metadata.value_pos, metadata.value_sz) + } + (None, Some(archive_file)) => { + file::read_reader(&archive_file, metadata.value_pos, metadata.value_sz) + } + _ => unreachable!(), // 理论上不可能到达这里 + }; + + // debug!( + // "file_id: {},value_pos: {}, value_sz: {}", + // metadata.file_id, metadata.value_pos, metadata.value_sz + // ); + // let bytes = file::read(&op_file, metadata.value_pos, metadata.value_sz); + match bytes_result { + Ok(bytes) => match Entry::try_from(bytes) { + Ok(entry) => { + debug!("{:?}", entry); + if entry.is_valid() { + return Ok(entry); + } else { + return Err(OpError::ValueInvalid); + } + } + Err(e) => { + error!("parse Entry object failed! {:?}", e); + return Err(OpError::ValueInvalid); + } + }, + Err(e) => { + error!("read error: {:?}", e); + return Err(OpError::ValueInvalid); + } + } + } + Err(OpError::KeyNotFound) + } + + // set/put + fn set(&mut self, key: &Vec, value: &Vec, timestamp: u64) { + debug!( + "set key:{:?}, value:{:?}, timestamp: {}", + key, value, timestamp + ); + let entry = entry::Entry::new(key.clone(), value.clone(), timestamp); + + // 文件大小分割 + if self.get_filesize() + entry.size() > self.config.file_max_size() { + self.active_file_archive(); + self.file_size.store(0, Ordering::SeqCst); + } else { + self.file_size.fetch_add(entry.size(), Ordering::SeqCst); + } + + // 2. 写入当前活跃文件 + let (entry_pos, entry_size) = file::append(&self.active_file, entry); + self.updated_key_num.fetch_add(1, Ordering::SeqCst); + let config_sync_keys_num = self.config.get_sync_keys_num() as usize; + if config_sync_keys_num > 0 + && self.updated_key_num.load(Ordering::SeqCst) >= config_sync_keys_num + { + // file flush + let mut active_file_fd = self.active_file.write().unwrap(); + active_file_fd.flush().unwrap(); + } + + // 3. update keydir + let metadata = Metadata { + file_id: ACTIVE_FILE_SEQ, + value_sz: entry_size, + value_pos: entry_pos, + tstamp: Utc::now().timestamp() as u64, + }; + let mut self_keydir = self.keydir.write().unwrap(); + self_keydir.set(key, metadata); + } + + // delete + fn delete(&mut self, key: &Vec) { + // 先检查是否存在,否则直接返回 + // self.set(key, &b"".to_vec()); + let value: Vec = vec![]; + let entry = entry::Entry::new(key.clone(), value, 0).set_removed(); + debug!("delete {:?}", entry); + self.file_size.fetch_add(entry.size(), Ordering::SeqCst); + let (_, _) = file::append(&self.active_file, entry); + let config_sync_keys_num = self.config.get_sync_keys_num() as usize; + if config_sync_keys_num > 0 + && self.updated_key_num.load(Ordering::SeqCst) >= config_sync_keys_num + { + // file flush + let mut active_file_fd = self.active_file.write().unwrap(); + active_file_fd.flush().unwrap(); + } + // delete index from keydir + let mut self_keydir = self.keydir.write().unwrap(); + self_keydir.remove(key); + } + + // len + fn len(&self) -> usize { + self.keydir.read().unwrap().len() + } + + fn keys(&self) -> Vec> { + let keydir = self.keydir.read().unwrap(); // 读锁定 + keydir.keys().into_iter().cloned().collect() + } + + fn compaction(&mut self) { + debug!("\n\n=== COMPACTION BEGIN ==="); + + let merge_dir = self.config.merge_dir(); + if merge_dir.exists() { + debug!("当前已处于工作状态 {:?}", merge_dir); + return; + } + // 创建临时合并目录 + fs::create_dir_all(merge_dir).unwrap(); + + let mut merge_file_seq: u16 = 1; + let mut merge_total_size = 0; + let mut merge_keydir = Keydir::new(); + + // merge file + let merge_filepath = self.config.get_merge_filepath_by_seq(merge_file_seq); + let mut merge_file_fd = file::new_writer(&merge_filepath).unwrap(); + debug!("create merge file: {:?}", merge_filepath); + + // hint file + let merge_hint_filepath = self.config.get_merge_hint_filepath_by_seq(merge_file_seq); + debug!("create merge hint file: {:?}", merge_hint_filepath); + let mut merge_hint_file_fd = file::new_writer(&merge_hint_filepath).unwrap(); + + let active_file_seq = self.config.get_next_datafile_seq(); + + // 新文件pos + let mut offset = 0; + debug!("archive_file_seq= {:?}", active_file_seq); + for (key, metadata) in self + .keydir + .read() + .unwrap() + .iter() + .filter(|(_, metadata)| metadata.file_id > 0) + { + let old_file_option = self.get_fd(metadata.file_id).unwrap(); + // 从原来的文件读取最新值 + let bytes_result = match old_file_option { + // (Some(active_file), None) => { + // // 合并操作,永远不会使用 active_file + // file::read(&active_file, metadata.value_pos, metadata.value_sz) + // } + (None, Some(archive_file)) => { + file::read_reader(&archive_file, metadata.value_pos, metadata.value_sz) + } + _ => unreachable!(), // 理论上不可能到达这里 + }; + + match bytes_result { + Ok(bytes) => { + let body_size = bytes.len(); + // splits a new file + if merge_total_size + body_size > self.config.file_max_size() { + // 1. close current merge file fd + merge_file_fd.flush().unwrap(); + merge_hint_file_fd.flush().unwrap(); + + // 2.1 create a new merge file and hint file + merge_file_seq += 1; + let merge_filepath = self.config.get_merge_filepath_by_seq(merge_file_seq); + merge_file_fd = file::new_writer(&merge_filepath).unwrap(); + debug!("[data]create merge file: {:?}", merge_filepath); + + let merge_hint_filepath = + self.config.get_merge_hint_filepath_by_seq(merge_file_seq); + merge_hint_file_fd = file::new_writer(&merge_hint_filepath).unwrap(); + debug!("[hint]create merge hint file: {:?}", merge_hint_filepath); + + // 3. reset all variable for next iter + merge_total_size = 0; + offset = 0; + } + + // write merge file + merge_file_fd.write_all(bytes.as_slice()).unwrap(); + + // write hint file + let entry = match Entry::try_from(bytes.clone()) { + Ok(entry) => entry, + Err(e) => { + debug!("wrong {:?}", bytes); + debug!("metadata {:?}", metadata); + panic!("wrong {:?}", e) + } + }; + let hint_entry = Hint { + timestamp: metadata.tstamp, + key_size: entry.key_size, + value_size: entry.size() as u64, // 整个entry 大小, + value_pos: offset, // 整个entry的读取位置 + key: entry.key, + }; + debug!("write_hit {:?}", hint_entry); + let hint_bytes: Vec = hint_entry.into(); + merge_hint_file_fd.write_all(&hint_bytes).unwrap(); + + // update keydir index + let merge_metadata = Metadata { + file_id: merge_file_seq, // 新 file_seq + value_pos: offset, // 在新文件offset + value_sz: metadata.value_sz, // entry 本身大小不变 + tstamp: metadata.tstamp, + }; + + debug!("old_medatdata {:?}", metadata); + debug!("hit_metadata {:?}", merge_metadata); + + merge_keydir.data.insert(key.clone(), merge_metadata); + + // position for next write + offset += metadata.value_sz; + merge_total_size += body_size; + } + Err(_) => error!("err {:?}", metadata.file_id), + } + } + + // 刷新写盘 + merge_file_fd.flush().unwrap(); + merge_hint_file_fd.flush().unwrap(); + + // move + if merge_keydir.data.len() > 0 { + // delete old datafiles + let mut files = self.files.write().unwrap(); + for i in files.keys() { + if *i >= active_file_seq { + continue; + } + let delete_file = self.config.get_filepath_by_seq(*i); + debug!("deleted old data file: {:?}", delete_file); + fs::remove_file(delete_file).unwrap(); + } + // 加锁操作 + // rename merge files + for i in 1..(merge_file_seq + 1) { + // merge file + let from = self.config.get_merge_filepath_by_seq(i); + let to = self.config.get_filepath_by_seq(i); + let src_path = Path::new(&from); + let dst_path = Path::new(&to); + + debug!("{:?} => {:?} File moved successfully!", from, to); + std::fs::rename(src_path, dst_path).unwrap(); + + // hint file + { + let from = self.config.get_merge_hint_filepath_by_seq(i); + let to = self.config.get_hint_filepath_by_seq(i); + let src_path = Path::new(&from); + let dst_path = Path::new(&to); + + std::fs::rename(src_path, dst_path).unwrap(); + debug!("{:?} => {:?} File moved successfully!", from, to); + } + + // 重新打开所有文件句柄,并注册[file_id:fd] + let fd = file::open_reader(&dst_path.to_path_buf()).unwrap(); + files.insert(i, Arc::new(RwLock::new(fd))); + } + + // update keydir index + let mut keydir = self.keydir.write().unwrap(); + keydir.extend(merge_keydir.data); + } else { + // remove empty file + fs::remove_file(merge_filepath).unwrap(); + } + + // 删除临时合并文件 + self.config.merge_cleanup(); + + debug!("\n=== COMPACTION END ===\n"); + } +} + +// --- keydir +pub struct Keydir { + // data: Arc>>, + data: HashMap, Metadata>, +} + +pub trait OpKeydir: Sync + Send { + fn new() -> Keydir; + fn get(&self, key: &Vec) -> Result; + + fn set(&mut self, key: &Vec, metadata: Metadata); + fn remove(&mut self, key: &Vec); + fn len(&self) -> usize; + + fn extend(&mut self, iter: I) + where + I: IntoIterator, Metadata)>; + + fn iter(&self) -> Iter, Metadata>; + + fn update_key(&mut self, file_id: u16); + fn keys(&self) -> Vec<&Vec>; +} + +impl OpKeydir for Keydir { + fn new() -> Keydir { + Keydir { + data: HashMap::new(), + } + } + + fn iter(&self) -> Iter, Metadata> { + self.data.iter() + } + fn get(&self, key: &Vec) -> Result { + self.data.get(key).cloned().ok_or(OpError::KeyNotFound) + } + + fn set(&mut self, key: &Vec, metadata: Metadata) { + self.data.insert(key.clone(), metadata); + } + + fn remove(&mut self, key: &Vec) { + self.data.remove(key); + } + + fn len(&self) -> usize { + self.data.len() + } + + fn keys(&self) -> Vec<&Vec> { + let mut result: Vec<&Vec> = Vec::with_capacity(self.data.len()); + for k in self.data.keys() { + result.push(k); + } + result + } + + // 定义 extend 方法,接收一个实现了 IntoIterator trait 的类型 + fn extend(&mut self, iter: I) + where + I: IntoIterator, Metadata)>, + { + self.data.extend(iter); + } + + fn update_key(&mut self, file_id: u16) { + for metadata in self.data.values_mut() { + if metadata.file_id == ACTIVE_FILE_SEQ { + metadata.file_id = file_id; + } + } + } +} + +// #[allow(dead_code)] +// fn read_file(seq: u16) { +// use std::io::{self, Read}; + +// let cfg = config::Config::new().unwrap(); +// let mut path = cfg.get_filepath_by_seq(seq); +// if seq == ACTIVE_FILE_SEQ { +// path = cfg.get_active_filepath(); +// } +// let mut the_file = file::open(&path).unwrap(); +// let mut buffer = vec![0; 24]; + +// loop { +// // crc = 4 | time = 8 | ksize = 4 | vsize = 8 +// match the_file.read_exact(&mut buffer) { +// Ok(()) => { +// //debug!("Successfully read {} bytes", buffer.len()); +// } +// Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => { +// // 文件结束EOF,处理不完整数据 +// debug!("Reached end of file"); +// break; // 退出循环 +// } +// Err(e) => { +// // 处理其他 I/O 错误 +// error!("Error reading file: {}", e); +// break; // 退出循环 +// } +// } +// let key_size = u32::from_le_bytes( +// buffer[12..16] +// .try_into() +// .map_err(|_| "Invalid key_size data") +// .unwrap(), +// ); +// let value_size = u64::from_le_bytes( +// buffer[16..24] +// .try_into() +// .map_err(|_| "Invalid value_size data") +// .unwrap(), +// ); + +// // key:: value +// // key +// let mut key_buf = vec![0; key_size as usize]; +// the_file.read_exact(&mut key_buf).unwrap(); + +// let mut value_buf = vec![0; value_size as usize]; +// the_file.read_exact(&mut value_buf).unwrap(); + +// let mut all_bytes = +// Vec::with_capacity(buffer.len() + key_size as usize + value_size as usize); +// all_bytes.extend_from_slice(&buffer); +// all_bytes.extend(key_buf); +// all_bytes.extend(value_buf); + +// match Entry::try_from(all_bytes) { +// Ok(value) => { +// println!("{:?}", value); +// } +// Err(e) => { +// panic!("format{}", e) +// } +// } +// } +// } + +// #[allow(dead_code)] +// fn read_hint_file(seq: u16) { +// let cfg = config::Config::new(); +// let path = cfg.get_hint_filepath_by_seq(seq); +// let the_file = file::open(&path).unwrap(); +// // let mut buffer = vec![0; 24]; +// println!("{:?}", path); + +// let the_file = HintFile::new(the_file); +// let result: Vec = HintFile::into(the_file); +// for item in result { +// println!("{:?}", item); +// } +// } + +//------ metadata +#[derive(Clone, Debug)] +pub struct Metadata { + file_id: u16, + value_sz: u64, // entry size + value_pos: u64, // entry pos + tstamp: u64, +} + +#[cfg(test)] +mod tests { + use env_logger; + use std::sync::Once; + + static INIT: Once = Once::new(); + + #[allow(dead_code)] + fn init_logger() { + INIT.call_once(|| { + env_logger::builder() + .is_test(true) // 仅在测试时启用日志 + .filter_level(log::LevelFilter::Debug) // 设置日志级别 + .init(); + }); + } + + // #[test] + // fn store_new() { + // let store = super::Store::new(); + + // assert_eq!(store.keydir.data.len(), 0); + // } + + // #[test] + // fn store_get() { + // let store = super::new_store(); + // // store.set(key, value); + // assert!(store.get("key").is_err()); + // } + + // #[test] + // fn store_set() { + // let mut store = super::new_store(); + // store.start(); + + // let key = "language"; + // let value = "rust"; + // store.set(key, value); + // assert!(store.get(key).is_ok()); + // assert_eq!(value.to_string(), store.get(key).unwrap()); + // } + + // // #[test] + // fn store_delete() { + // let mut store = super::new_store(); + // store.start(); + + // let key = "age"; + // let value = "11"; + // store.set(key, value); + // assert_eq!("11".to_string(), store.get(key).unwrap()); + // store.delete(key); + // assert_eq!(0, store.len()) + // } + // #[test] + // fn generate_next_file() { + // init_logger(); + // use crate::config::{self, Config}; + // use rand::Rng; + // use std::path::Path; + + // // let file = Path::new("./config.toml"); + // // let c = Config::try_from(file).unwrap(); + // let (tx, rx) = std::sync::mpsc::channel(); + // let cnf = Config::default(); + // let store = super::new_store(cnf, tx, rx); + // // let mut store = store.write().unwrap(); + + // // store.start(); + // for num in 1..20000 { + // // let num = rand::thread_rng().gen_range(1..=20000); + // // 将数字转换为字符串,再转换为 Vec + // let key: Vec = num.to_string().into_bytes(); + // let value: Vec = (num * 2).to_string().into_bytes(); + // store.set(&key, &value, 0); + // } + // } + + // #[test] + // fn test_read_file() { + // super::read_file(0); + // } + + // #[test] + // fn test_read_hint_file() { + // super::read_hint_file(1); + // } + + // #[test] + // fn load_hint_file() { + // let mut store = super::new_store(); + // store.start(); + // } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..b04c40f --- /dev/null +++ b/src/util.rs @@ -0,0 +1,94 @@ +use log::*; +use regex::Regex; +use std::fs; +use std::path::{Path, PathBuf}; +pub mod lock; +pub mod time; + +pub fn get_files_in_directory(path: &Path) -> Result, std::io::Error> { + // 检查目录是否存在 + if !path.exists() { + return Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Directory does not exist", + )); + } + + // 检查路径是否是目录 + if !path.is_dir() { + debug!("文件 {:?} 是否为一个路径", path); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Path is not a directory", + )); + } + + // 读取目录下的所有条目 + let mut file_list = Vec::new(); + for entry in fs::read_dir(path)? { + let entry = entry?; + let path = entry.path(); + + // 只添加文件,不包括子目录 + if path.is_file() { + file_list.push(path); + } + } + + file_list.sort(); + + Ok(file_list) +} + +pub fn filter_file(file_path: &PathBuf, prefix: &str, ext: &str) -> bool { + // 检查文件名前缀 + if let Some(file_name) = file_path.file_stem() { + if let Some(file_name_str) = file_name.to_str() { + if !file_name_str.starts_with(prefix) { + debug!("忽略文件 {:?}", file_name_str); + return false; + } + } + } + + // 检查文件扩展名 + if let Some(file_ext) = file_path.extension() { + debug!("检查datafile 文件扩展名 {:?} = {:?}", file_ext, ext); + if file_ext != ext { + return false; + } + } else { + return false; + } + + true +} + +pub fn parse_seq_from_filename(filename: &PathBuf) -> usize { + let main_filename = filename.file_stem().unwrap().to_string_lossy(); + + // 提取数字部分 + let number_str = main_filename.split('-').last().unwrap(); // 取最后一个部分 + let number: u32 = number_str.parse().unwrap(); + + println!("main_filename: {:?}", main_filename); + number as usize +} + +// Helper function to format bytes as a UTF-8 string +pub fn format_bytes_as_str(bytes: &[u8]) -> String { + match std::str::from_utf8(bytes) { + Ok(s) => s.to_string(), + Err(_) => "[invalid UTF-8]".to_string(), + } +} + +// 模仿 Redis 的 KEYS 命令,模式支持 '*' 和 '?', 实现常用正则 +pub fn match_key(search_str: &str, key: &str) -> bool { + // 将搜索字符串转换为正则表达式 + let regex_pattern = search_str.replace("*", ".*").replace("?", "."); + match Regex::new(&format!("^{}$", regex_pattern)) { + Ok(regex) => regex.is_match(key), + Err(_) => false, + } +} diff --git a/src/util/lock.rs b/src/util/lock.rs new file mode 100644 index 0000000..f15bbfb --- /dev/null +++ b/src/util/lock.rs @@ -0,0 +1,58 @@ +use fs2::FileExt; +use std::fs::{File, OpenOptions}; +use std::io::{self, Error, ErrorKind}; + +pub struct Locker(File); + +impl Locker { + pub fn acquire() -> Result { + let file_path = "lock"; + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(file_path)?; + + // 尝试获取独占锁 + match file.try_lock_exclusive() { + Ok(_) => Ok(Locker(file)), + Err(_) => Err(Error::new(ErrorKind::Other, "The lock has been used")), + } + } + + pub fn release(self) { + // 锁会在文件关闭时自动释放 + drop(self.0); + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn locker_test() -> io::Result<()> { + // 清理旧的锁文件(如果存在) + let file_path = "lock"; + let _ = std::fs::remove_file(file_path); + + // 尝试获取第一个锁 + let lock = Locker::acquire(); + assert!(lock.is_ok()); + + // 尝试获取第二个锁,应该失败 + let lock2 = Locker::acquire(); + assert!(lock2.is_err()); + + // 释放第一个锁 + lock.unwrap().release(); + + // 再次尝试获取锁,应该成功 + let lock = Locker::acquire(); + assert!(lock.is_ok()); + + // 清理锁文件 + std::fs::remove_file(file_path)?; + + Ok(()) + } +} diff --git a/src/util/time.rs b/src/util/time.rs new file mode 100644 index 0000000..51bf761 --- /dev/null +++ b/src/util/time.rs @@ -0,0 +1,30 @@ +use std::time::{Duration, SystemTime}; + +// 获取当前时间的毫秒时间戳 +pub fn current_milliseconds() -> u64 { + let now = SystemTime::now(); + let duration_since_epoch = now + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or(Duration::default()); + duration_since_epoch.as_millis() as u64 +} + +pub fn get_millisec_from_sec(sec: u64) -> u64 { + current_milliseconds() + sec * 1000 +} + +pub fn get_millisec(millisec: u64) -> u64 { + current_milliseconds() + millisec +} + +pub fn sec_to_millisec(sec: u64) -> u64 { + sec * 1000 +} + +pub fn get_lifetime_sec(expire_millisec: u64) -> u64 { + (expire_millisec - current_milliseconds()) / 1000 +} + +pub fn get_lifetime_millisec(expire_millisec: u64) -> u64 { + expire_millisec - current_milliseconds() +}