From 20d92705b30b4d53b8370b0617268b4b7b7b9e22 Mon Sep 17 00:00:00 2001 From: ipconfiger Date: Wed, 24 Jan 2024 11:59:25 +0800 Subject: [PATCH] bug fix 1. saving meta json file 2. change statistics format --- Cargo.lock | 219 ++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 3 +- src/main.rs | 3 +- src/message_queue.rs | 1 + src/position_manager.rs | 24 +++-- src/statistics.rs | 8 +- 6 files changed, 239 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f50cf8..b7bc5fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,21 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "async-trait" version = "0.1.77" @@ -81,6 +96,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" + [[package]] name = "byteorder" version = "1.5.0" @@ -108,6 +129,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.52.0", +] + [[package]] name = "clap" version = "3.2.25" @@ -155,6 +190,12 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpufeatures" version = "0.2.12" @@ -313,6 +354,29 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "iana-time-zone" +version = "0.1.59" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.5.0" @@ -349,6 +413,15 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "js-sys" +version = "0.3.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "libc" version = "0.2.152" @@ -414,6 +487,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num-traits" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +dependencies = [ + "autocfg", +] + [[package]] name = "num_enum" version = "0.5.11" @@ -621,6 +703,7 @@ dependencies = [ "bitflags 2.4.1", "byteorder", "bytes", + "chrono", "clap", "dirs", "hex", @@ -916,6 +999,60 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.48", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" + [[package]] name = "winapi" version = "0.3.9" @@ -947,13 +1084,22 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -962,13 +1108,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -977,42 +1138,84 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.34" diff --git a/Cargo.toml b/Cargo.toml index 4656e2d..0cef867 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,4 +19,5 @@ dirs = "3.0.2" rdkafka = { version = "0.33.2", default-features = false, features = ["cmake-build"] } redis = { version = "0.23", features = ["tokio-comp"] } hex = "0.4" -byteorder = "1.4.3" \ No newline at end of file +byteorder = "1.4.3" +chrono = "0.4" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 8bdb1ec..7d8bee4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -76,6 +76,7 @@ fn serve(cfg_path: &String) { //println!("text result is :{:?}", text_resp); let (file, pos) = check_valid_pos(posMng.clone(), text_resp, config.from_start.is_some_and(|b|b)); println!("{file} {pos}"); + update_name_pos(posMng.clone(), &file, pos); let dump = ComBinLogDump { pos, @@ -92,7 +93,7 @@ fn serve(cfg_path: &String) { let mut statistics= Statistics::new(); loop { let (_, buf) = conn.read_package::>().unwrap(); - statistics.feed_bytes(buf.payload.len()); + statistics.feed_bytes(seq_idx, buf.payload.len()); let event_result = EventRaw::decode(buf.payload.as_bytes()); if let Ok((_, ev)) = event_result { if ev.header.event_type == 19 { diff --git a/src/message_queue.rs b/src/message_queue.rs index aeafda5..613778c 100644 --- a/src/message_queue.rs +++ b/src/message_queue.rs @@ -76,6 +76,7 @@ fn outgiving_body(rx: Receiver, mq_ins: &mut dyn QueueClient, posM if let Ok(msg) = rx.recv() { mq_ins.queue_message(&msg); update_pos(posMng.clone(), msg.pos); + //println!("msg sent!"); } } } diff --git a/src/position_manager.rs b/src/position_manager.rs index cf2e7f2..a7f4f7d 100644 --- a/src/position_manager.rs +++ b/src/position_manager.rs @@ -55,7 +55,10 @@ impl PositionMng { pub fn load_from_file(p: Arc>) -> bool { let abs_path = get_abs_path("~/.ru_cdc/meta.json".to_string()); let position = match read_file_content(abs_path) { - Ok(s)=>serde_json::from_str::(s.as_str()), + Ok(s)=> { + println!("load meta:{}", &s); + serde_json::from_str::(s.as_str()) + }, Err(err)=>{ println!("读取索引meta失败:{err:?}"); return false; @@ -111,20 +114,27 @@ fn read_from_row(row: &TextResult) -> (String, u32) { pub fn check_valid_pos(p: Arc>, rd: TextResultSet, from_start: bool) -> (String, u32) { let record_count = rd.rows.len(); loop { - if let Ok(mut p) = p.lock() { - if p.loaded { + if let Ok(mut pm) = p.lock() { + if pm.loaded { //如果加载了状态文件,,就检测 from_start标识是否强制覆盖 if from_start{ - return (read_from_row(&rd.rows[0]).0, 4); + let log_name = read_from_row(&rd.rows[0]).0; + println!("已加载meta file,从头读:{} 4", &log_name); + return (log_name, 4); }else{ - return (p.binlog.clone(), p.position); + println!("已加载meta file, 从加载位置开始:{} {}", &pm.binlog, pm.position); + return (pm.binlog.clone(), pm.position); } }else{ // 如果没有状态文件,就要根据 from_start标识来判断是从头加载还是加载最后一段 if from_start { - return (read_from_row(&rd.rows[0]).0, 4); + let log_name = read_from_row(&rd.rows[0]).0; + println!("未加载meta file,从头读:{} 4", &log_name); + return (log_name, 4); }else{ - return read_from_row(&rd.rows[record_count-1]); + let meta_record = read_from_row(&rd.rows[record_count-1]); + println!("未加载meta file, 从最新索引开始:{} {}", &meta_record.0, meta_record.1); + return meta_record; } } }else{ diff --git a/src/statistics.rs b/src/statistics.rs index 566381e..4fb299e 100644 --- a/src/statistics.rs +++ b/src/statistics.rs @@ -1,4 +1,5 @@ use crate::executor::current_ts; +use chrono::Local; pub struct Statistics{ pub all_bytes: u128, @@ -14,18 +15,21 @@ impl Statistics { last_checkpoint: 0, } } - pub fn feed_bytes(&mut self, bytes_count: usize) { + pub fn feed_bytes(&mut self, seq_idx:u64, bytes_count: usize) { let ts = current_ts() / 1000; self.all_bytes += bytes_count as u128; if self.last_checkpoint > 0 { if ts - self.last_checkpoint > 5 { + let local_time = Local::now(); let rest_bytes = self.all_bytes - self.check_bytes; let time_used = ts - self.last_checkpoint; let byte_rate = rest_bytes / time_used as u128; let mb_rate = byte_rate as f64 / (1024f64 * 1024f64); + let total = self.all_bytes as f64 / (1024f64 * 1024f64); self.check_bytes = self.all_bytes; self.last_checkpoint = ts; - println!("当前速率:{mb_rate:0.2} MB/s"); + let dts = local_time.format("%Y/%m/%d %H:%M:%S"); + println!("{dts} |=> 处理包计数:{seq_idx},总流量:{total:02}MB 当前速率:{mb_rate:0.2} MB/s"); } }else{ self.last_checkpoint = ts;