diff --git a/src/binlog.rs b/src/binlog.rs index 03ed607..c80320f 100644 --- a/src/binlog.rs +++ b/src/binlog.rs @@ -270,16 +270,23 @@ impl ColumnType { }, Self::DATETIME=>{ let (i, dt) = take_bytes(input, 5usize)?; - let year_month = read_bits(dt, 1, 17); - let year = year_month / 13; - let month=year_month % 13; - let day = read_bits(dt, 18, 5); - let hour = read_bits(dt, 23, 5); - let minute = read_bits(dt, 28, 5); - let second = read_bits(dt, 34, 5); + let val = u64::from_be_bytes([0u8, 0u8, 0u8, dt[0], dt[1], dt[2], dt[3], dt[4]]) - 0x8000000000; + let d_val = val >> 17; + let t_val = val % (1 << 17); + let year = ((d_val >> 5) / 13) as u32; + let month = ((d_val >> 5) % 13) as u32; + let day = (d_val % (1 << 5)) as u32; + let hour = ((val >> 12) % (1 << 5)) as u32; + let minute = ((t_val >> 6) % (1 << 6)) as u32; + let second = (t_val % (1 << 6)) as u32; let (i, microsecond) = read_fps(i, meta.fsp.unwrap_or(0u8))?; - let datetime_str = format!("{year:04}-{month:02}-{day:02} {hour:02}:{minute:02}:{second:02}.{microsecond:03}"); - Ok((i, Value::from(datetime_str))) + if meta.fsp.unwrap_or(0u8) > 0u8 { + let datetime_str = format!("{year:04}-{month:02}-{day:02} {hour:02}:{minute:02}:{second:02}.{microsecond:03}"); + Ok((i, Value::from(datetime_str))) + }else{ + let datetime_str = format!("{year:04}-{month:02}-{day:02} {hour:02}:{minute:02}:{second:02}"); + Ok((i, Value::from(datetime_str))) + } }, Self::YEAR=>{ let (i, dt) = take_int1(input)?; @@ -835,10 +842,11 @@ pub fn decode_column_vals<'a>(mut table_map: TableMap, input: &'a [u8], table_id let null_map = compute_null_map(null_map1, metas.len()); let mut i = ip; for (idx, mut col_type) in &mut table_map.mapping.get_mut(&table_id).unwrap().iter_mut().enumerate() { + let meta = metas[idx].clone(); if null_map[idx] { + values.push(Value::from(None::)); continue; } - let meta = metas[idx].clone(); let (new_i, val) = ColumnType::decode_val(col_type.clone(), i, &meta)?; i = new_i; values.push(val.clone()); diff --git a/src/executor.rs b/src/executor.rs index 3f9aff9..4df7d60 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -11,7 +11,7 @@ use crate::message_queue::{MessageQueues, QueueMessage}; use crate::mysql::{Decoder, MySQLConnection}; use std::sync::mpsc::{channel, Sender, Receiver}; use nom::AsBytes; -use crate::binlog::{DeleteRowEvent, EventRaw, TableMap, TableMapEvent, UpdateRowEvent, WriteRowEvent}; +use crate::binlog::{ColMeta, DeleteRowEvent, EventRaw, TableMap, TableMapEvent, UpdateRowEvent, WriteRowEvent}; fn current_ms_ts() -> u128 { let now = SystemTime::now(); @@ -105,60 +105,188 @@ pub struct DmlMessage { } impl DmlMessage { + fn format_val(val: &Value) -> String { + if val.is_null() { + "null".to_string() + }else{ + if val.is_string(){ + format!("{}", val.to_string()) + }else{ + format!("\"{}\"", val.to_string()) + } + } + } + + fn format_json(&mut self, fields: &mut Vec) -> String { + let mut buffer:Vec = Vec::new(); + buffer.push("{".to_string()); + buffer.push("\"data\":[".to_string()); + let data_count = &self.data.len(); + for (i, record) in self.data.iter_mut().enumerate(){ + buffer.push("{".to_string()); + let mut key_buffer:Vec = Vec::new(); + for (idx, meta) in fields.iter().enumerate(){ + let val = record.get(&meta.name).unwrap(); + let data_value = format!("\"{}\":{}", &meta.name, Self::format_val(val)); + key_buffer.push(data_value); + } + let dict_inner = key_buffer.join(","); + buffer.push(dict_inner); + if i != (*data_count - 1usize){ + buffer.push("},".to_string()); + }else{ + buffer.push("}".to_string()); + } + } + buffer.push("],".to_string()); + buffer.push(format!("\"database\":\"{}\",", &self.database)); + buffer.push(format!("\"es\":{},", self.es)); + buffer.push(format!("\"id\":{},", self.id)); + buffer.push("\"isDdl\": false,".to_string()); + buffer.push("\"mysqlType\":{".to_string()); + for (idx, meta) in fields.iter().enumerate(){ + let tp = self.mysqlType.get(&meta.name).unwrap(); + if idx != (fields.len() - 1usize){ + buffer.push(format!("\"{}\":\"{}\",", meta.name, tp)); + }else{ + buffer.push(format!("\"{}\":\"{}\"", meta.name, tp)); + } + } + buffer.push("},".to_string()); + if self.old.is_some(){ + buffer.push("\"old\":[".to_string()); + let old_count = self.old.clone().unwrap().len(); + for (i, record) in self.old.clone().unwrap().iter().enumerate(){ + buffer.push("{".to_string()); + let mut key_buffer:Vec = Vec::new(); + for (idx, meta) in fields.iter().enumerate(){ + let val = &record.get::(&meta.name); + if let Some(val_ok) = val { + let val_str = Self::format_val(val_ok); + key_buffer.push(format!("\"{}\":{}", meta.name, val_str)); + } + } + let inner_str = key_buffer.join(","); + buffer.push(inner_str); + if i != (old_count - 1usize){ + buffer.push("},".to_string()); + }else{ + buffer.push("}".to_string()); + } + } + buffer.push("],".to_string()); + } + if self.pkNames.is_none() { + buffer.push("\"pkNames\":null,".to_string()); + }else{ + buffer.push("\"pkNames\":[".to_string()); + for (idx, name) in self.pkNames.clone().unwrap().iter().enumerate() { + if idx != (self.pkNames.clone().unwrap().len() - 1usize){ + buffer.push(format!("\"{name}\",")); + }else{ + buffer.push(format!("\"{name}\"")); + } + } + buffer.push("],".to_string()) + } + buffer.push("\"sql\":\"\",".to_string()); + buffer.push("\"sqlType\":{".to_string()); + for (idx, meta) in fields.iter().enumerate(){ + let sqlType = self.sqlType.get::(&meta.name).unwrap(); + if idx!= (fields.len() - 1usize) { + buffer.push(format!("\"{}\":{},", meta.name, sqlType)); + }else{ + buffer.push(format!("\"{}\":{}", meta.name, sqlType)); + } + } + buffer.push("},".to_string()); + buffer.push(format!("\"table\":\"{}\",", self.table)); + buffer.push(format!("\"ts\":{},", self.ts)); + buffer.push(format!("\"type\":\"{}\"", &self.r#type)); + buffer.push("}".to_string()); + buffer.join("") + } + + fn text_field_data(val: &Value) -> String{ + match val.as_array(){ + Some(s)=>{String::from_utf8_lossy(s.iter().map(|n| n.as_u64().unwrap() as u8).collect::>().as_slice()).to_string()}, + None=>String::from("") + } + } + + fn blob_field_data(val: &Value) -> String { + match val.as_array(){ + Some(s)=>{ String::from_utf16(s.iter().map(|n| n.as_u64().unwrap() as u16).collect::>().as_slice()).unwrap()}, + None=>String::from("") + } + } + fn from_dml(mut dml: DmlData, fields: &mut Vec) -> Self { let mut ins = Self::new(dml.id, dml.database, dml.table, dml.dml_type, dml.es); let mut pks: Vec = Vec::new(); - - for vals in dml.data.iter_mut(){ - let mut record:HashMap = HashMap::new(); - for (idx, val) in vals.iter().enumerate(){ - if let Some(meta) = fields.get_mut(idx){ - ins.mysqlType.insert(meta.name.clone(), meta.field_type.clone()); - let sql_tp = meta.get_sql_type(); - ins.sqlType.insert(meta.name.clone(), sql_tp); - if sql_tp == 2005 { - let val_s = match val.as_array(){ - Some(s)=>{String::from_utf8_lossy(s.iter().map(|n| n.as_u64().unwrap() as u8).collect::>().as_slice()).to_string()}, - None=>String::from("") - }; - record.insert(meta.name.clone(), Value::from(val_s)); - }else{ - if sql_tp == 2004 { - let val_s = match val.as_array(){ - Some(s)=>{ String::from_utf16(s.iter().map(|n| n.as_u64().unwrap() as u16).collect::>().as_slice()).unwrap()}, - None=>String::from("") - }; - record.insert(meta.name.clone(), Value::from(val_s)); - }else { - record.insert(meta.name.clone(), val.clone()); + let record_count = dml.data.len(); + let mut old_record_vec: Vec> = Vec::new(); + for record_id in 0..record_count{ + let old_vals = dml.old_data.get(record_id); + let hash_old_val = old_vals.is_none(); + let new_vals = dml.data.get(record_id).unwrap(); + + let mut record_data:HashMap = HashMap::new(); + let mut record_old:HashMap = HashMap::new(); + + for (idx, file_meta) in fields.iter_mut().enumerate(){ + let sql_tp = file_meta.get_sql_type(); + if record_id == 0 { + ins.mysqlType.insert(file_meta.name.clone(), file_meta.field_type.clone()); + ins.sqlType.insert(file_meta.name.clone(), sql_tp); + if file_meta.is_pk { + if !pks.contains(&file_meta.name){ + pks.insert(0, file_meta.name.clone()); } } - - if meta.is_pk { - if !pks.contains(&meta.name){ - pks.insert(0, meta.name.clone()); + } + let data_val = new_vals.get(idx).unwrap(); + let is_same = match old_vals { + Some(old_values)=>{ + let ov = old_values.get(idx).unwrap(); + ov.eq(data_val) + }, + None=>true + }; + + if sql_tp == 2005 { + let val_s = Self::text_field_data(data_val); + record_data.insert(file_meta.name.clone(), Value::from(val_s)); + if !is_same{ + let old_val = old_vals.unwrap().get(idx).unwrap(); + let old_val_s = Self::text_field_data(old_val); + record_old.insert(file_meta.name.clone(), Value::from(old_val_s)); + } + }else{ + if sql_tp == 2004 { + let val_s = Self::blob_field_data(data_val); + record_data.insert(file_meta.name.clone(), Value::from(val_s)); + if !is_same { + let old_val = old_vals.unwrap().get(idx).unwrap(); + let old_val_s = Self::blob_field_data(old_val); + record_data.insert(file_meta.name.clone(), Value::from(old_val_s)); + } + }else{ + record_data.insert(file_meta.name.clone(), data_val.clone()); + if !is_same { + let old_val = old_vals.unwrap().get(idx).unwrap(); + record_old.insert(file_meta.name.clone(), old_val.clone()); } } } } - ins.data.push(record); + ins.data.push(record_data); + old_record_vec.push(record_old); } - let mut data_old: Vec> = Vec::new(); - for vals in dml.old_data.iter_mut() { - let mut record:HashMap = HashMap::new(); - for (idx, val) in vals.iter().enumerate() { - if let Some(meta) = fields.get(idx){ - record.insert(meta.name.clone(), val.clone().take()); - } - } - data_old.push(record) - } - if data_old.len() > 0usize{ - ins.old = Some(data_old); - } - if pks.len() > 0usize{ + if pks.len() > 0{ ins.pkNames = Some(pks); } + ins.old = Some(old_record_vec); ins } @@ -201,7 +329,7 @@ impl FieldMeta{ return 4 } if self.field_type.starts_with("bigint") { - return 5; + return -5; } if self.field_type.starts_with("float") { return 7; @@ -221,7 +349,7 @@ impl FieldMeta{ if self.field_type.starts_with("year") { return 12; } - if self.field_type.eq("datetime") || self.field_type.eq("timestamp") { + if self.field_type.starts_with("datetime") || self.field_type.starts_with("timestamp") { return 93 } if self.field_type.starts_with("char") { @@ -236,6 +364,7 @@ impl FieldMeta{ if self.field_type.ends_with("text") { return 2005; } + println!("invalid:{:?}", self); -999 } } @@ -251,13 +380,13 @@ impl TableMetaMapping { Self{ mapping: Arc::new(Mutex::new(HashMap::new())) } } - fn update_mapping(&mut self, conn: &mut MySQLConnection, tid: u32, db: String, table: String) -> Result, ()> { + fn update_mapping(&mut self, conn: &mut MySQLConnection, tid: u32, db: String, table: String, table_map: &Vec) -> Result, ()> { loop { if let Ok(mut mp) = self.mapping.lock() { if !mp.contains_key(&tid) { let mut cols = Vec::new(); //println!("Check Mapping {tid} {db} {table} in {:?}", mp.contains_key(&tid)); - if conn.desc_table(db.clone(), table.clone(), &mut cols){ + if conn.desc_table(db.clone(), table.clone(), &mut cols, table_map){ mp.insert(tid, cols.clone()); return Ok(cols.clone()); }else{ @@ -345,7 +474,8 @@ fn worker_body(thread_id: usize, rx: Receiver, mapping: &mut TableMet if let Ok(data) = rx.recv() { let mut ports: Vec<(String, String)> = Vec::new(); let (_, tablemap) = TableMapEvent::decode(data.table_map.payload.as_slice()).expect("table map error"); - table_map.decode_columns(tablemap.header.table_id, tablemap.column_types, tablemap.column_metas.as_bytes()); + let tm = tablemap.clone(); + table_map.decode_columns(tm.header.table_id, tm.column_types, tm.column_metas.as_bytes()); let mut current_data = DmlData::new_data(tablemap.header.table_id as u32, tablemap.schema_name.clone(), tablemap.table_name.clone()); for instance in instances.iter_mut(){ if let Some((mq_name, topic)) = instance.check_if_need_a_mq(current_data.database.clone(), current_data.table.clone()) { @@ -353,7 +483,7 @@ fn worker_body(thread_id: usize, rx: Receiver, mapping: &mut TableMet } } if ports.len() < 1 { - //println!("未匹配到实例:{}.{}", &data.database, &data.table); + //println!("未匹配到实例:{}.{}", ¤t_data.database, ¤t_data.table); continue; } if let Some(ev) = data.row_event { @@ -380,18 +510,27 @@ fn worker_body(thread_id: usize, rx: Receiver, mapping: &mut TableMet current_data.append_data(data.seq_idx, "DELETE".to_string(), Vec::new(), old_values, ev.header.log_pos); } if vec![32u8, 31u8, 30u8].contains(&ev.header.event_type) { - if let Ok(mut meta) = mapping.update_mapping(&mut conn, current_data.table_id, current_data.database.clone(), current_data.table.clone()) { + let tm = tablemap.clone(); + if let Ok(mut meta) = mapping.update_mapping(&mut conn, + current_data.table_id, + current_data.database.clone(), + current_data.table.clone(), + &table_map.metas[&tm.header.table_id] + + ) { if meta.len() == 0usize { println!("表{}.{} 不存在", current_data.database, current_data.table); continue } - let message = DmlMessage::from_dml(current_data, &mut meta); - if let Ok(json_message) = serde_json::to_string(&message) { - //println!("Canal JSON:\n{}", &json_message); + let mut message = DmlMessage::from_dml(current_data, &mut meta); + let json_str = message.format_json(&mut meta); + if ports.len() > 0 { for (mq_name, topic) in ports { - let msg_qu = QueueMessage { topic, payload: json_message.clone(), pos }; + let msg_qu = QueueMessage { topic, payload: json_str.clone(), pos }; queue.push(&mq_name, msg_qu); } + }else{ + println!("没有可用发送端口"); } } diff --git a/src/message_queue.rs b/src/message_queue.rs index 613778c..234c404 100644 --- a/src/message_queue.rs +++ b/src/message_queue.rs @@ -77,6 +77,7 @@ fn outgiving_body(rx: Receiver, mq_ins: &mut dyn QueueClient, posM mq_ins.queue_message(&msg); update_pos(posMng.clone(), msg.pos); //println!("msg sent!"); + //println!("{}", &msg.payload); } } } diff --git a/src/mysql.rs b/src/mysql.rs index 71a8a24..4120156 100644 --- a/src/mysql.rs +++ b/src/mysql.rs @@ -5,6 +5,7 @@ use nom::{IResult, error, Err, bytes::{complete}, AsBytes}; use bytes::{Buf, BufMut, BytesMut}; use nom::error::{ErrorKind, Error, VerboseError, VerboseErrorKind}; use nom::Err as NomErr; +use crate::binlog::{ColMeta, TableMap}; use crate::executor::FieldMeta; use crate::protocal::{AuthSwitchReq, AuthSwitchResp, Capabilities, ColDef, ComQuery, HandshakeResponse41, HandshakeV10, OkPacket, TextResult, TextResultSet, VLenInt}; @@ -132,22 +133,32 @@ impl MySQLConnection { self.conn.write_all(&buff) } - pub fn desc_table(&mut self, db: String, table: String, col_meta: &mut Vec) -> bool { + pub fn desc_table(&mut self, db: String, table: String, col_meta: &mut Vec, table_map: &Vec) -> bool { let sql = format!("desc {db}.{table}"); //println!("{}", &sql); let query = ComQuery{query: sql}; self.write_package(0, &query); if let Ok(text_resp) = self.read_text_result_set() { - for row in text_resp.rows { + for (idx, row) in text_resp.rows.iter().enumerate() { let name = String::from_utf8_lossy(row.columns[0].as_bytes()).to_string(); let field_type = String::from_utf8_lossy(row.columns[1].as_bytes()).to_string(); let pk = String::from_utf8_lossy(row.columns[3].as_bytes()).to_string(); - let meta = FieldMeta { - name, - field_type, - is_pk: Self::check_pk(&pk), - }; - col_meta.push(meta); + if field_type.starts_with("datetime"){ + let meta = &table_map[idx]; + let meta = FieldMeta { + name, + field_type: format!("{}({})", field_type, meta.fsp.unwrap_or(0u8)), + is_pk: Self::check_pk(&pk), + }; + col_meta.push(meta); + }else { + let meta = FieldMeta { + name, + field_type, + is_pk: Self::check_pk(&pk), + }; + col_meta.push(meta); + } } true }else{