diff --git a/services/src/ot.rs b/services/src/ot.rs index 61e4d69..4706ad2 100644 --- a/services/src/ot.rs +++ b/services/src/ot.rs @@ -18,9 +18,11 @@ use crate::{client::ClientInfo, fs_watcher::FSWatcher, FSEvent, IPCMessage, Sess use super::traits; use anyhow::{format_err, Result}; use async_trait::async_trait; +use crc32fast::Hasher; +use goval::{ot_op_component::OpComponent, OtOpComponent}; use ropey::Rope; use similar::TextDiff; -use tokio::fs; +use tokio::{fs, io::AsyncWriteExt}; use tracing::{debug, error, trace, warn}; impl OT { @@ -122,40 +124,6 @@ impl traits::Service for OT { self.watcher.watch(vec![path])?; - // let mut reader = self.watcher.get_event_reader().await; - // let sending_map = self._sending_map.clone(); - // let file_path = self.path.clone(); - // let crc32 = self.crc32.clone(); - // let contents = self.contents.clone(); - // let version = self.version.clone(); - // let history = self.history.clone(); - // let channel_id = info.id.clone(); - // tokio::spawn(async move { - // loop { - // let res = async { - // match reader.recv().await { - // Ok(res) => { - - // LoopControl::Cont(Ok(())) - // } - // Err(err) => match err { - // RecvError::Closed => LoopControl::Break, - // RecvError::Lagged(ammount) => { - // warn!(messages = ammount; "FSEvents lagged"); - // LoopControl::Cont(Ok(())) - // } - // }, - // } - // } - // .await; - - // match res { - // LoopControl::Break => break, - // LoopControl::Cont(result) => result.expect("TODO: deal with this"), - // } - // } - // }); - return Ok(Some(link_response)); } @@ -208,9 +176,7 @@ impl traits::Service for OT { } } - let to_write = self.contents.to_string(); self.version += 1; - // drop(version); let user_id; if ot.author == goval::ot_packet::Author::Ghostwriter as i32 { @@ -221,7 +187,27 @@ impl traits::Service for OT { user_id = 23_054_564; // https://replit.com/@homeval-user } - let crc32 = crc32fast::hash(to_write.as_bytes()); + let mut crc32_hasher = Hasher::new(); //crc32fast::hash(to_write.as_bytes()); + let mut file_writer = fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.path) + .await?; + + for chunk in self.contents.chunks() { + let bytes = chunk.as_bytes(); + + crc32_hasher.update(bytes); + file_writer.write_all(bytes).await?; + } + + file_writer.flush().await?; + file_writer.sync_data().await?; + + drop(file_writer); + + let crc32 = crc32_hasher.finalize(); self.crc32 = crc32; let committed = Some(prost_types::Timestamp { @@ -249,8 +235,6 @@ impl traits::Service for OT { info.send(ot_notif, crate::SendSessions::Everyone)?; - fs::write(&self.path, to_write).await?; - let ok = goval::Command { body: Some(goval::command::Body::Ok(goval::Ok {})), ..Default::default() @@ -337,7 +321,20 @@ impl traits::Service for OT { let new_contents = String::from_utf8(new_contents).expect("TODO: Deal with this"); + // let new_contents_str: &str = &new_contents; + // let new_self_contents = new_contents_str.into(); + let ops = diff(&self.contents.to_string(), &new_contents); + // let ops = vec![ + // OtOpComponent { + // op_component: Some(OpComponent::Delete( + // self.contents.len_chars() as u32 + // )), + // }, + // OtOpComponent { + // op_component: Some(OpComponent::Insert(new_contents)), + // }, + // ]; self.contents = new_contents.into(); self.crc32 = new_crc32; @@ -427,7 +424,7 @@ impl traits::Service for OT { fn diff(old_text: &str, new_text: &str) -> Vec { let mut differ_config: similar::TextDiffConfig = TextDiff::configure(); - let differ = differ_config.timeout(Duration::from_secs(1)); + let differ = differ_config.timeout(Duration::from_secs_f32(0.3)); let diff = differ.diff_chars(old_text, new_text); let mut parts: Vec = vec![];