From a889cf10f9f7b72abb450ea9e272713ba4e4a401 Mon Sep 17 00:00:00 2001 From: Yureka Date: Sat, 22 Jun 2024 14:19:42 +0200 Subject: [PATCH] streaming decrypt --- README.md | 4 ++-- src/main.rs | 67 ++++++++++++++++++++++++++++------------------------- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 771e568..a05d22a 100644 --- a/README.md +++ b/README.md @@ -65,10 +65,10 @@ First we need to gather a dump of/ all messages. In this example we use the JSON export feature of Postgres to export events from the Synapse database. Export all events: -`psql -d matrix-synapse -qAtX -c "select json_agg(t) FROM (SELECT * from event_json) t;" -o messages.json` +`psql -d matrix-synapse -qAtX -c "select row_to_json(event_json) from event_json;" -o messages.json` Export all events from specific room: -`psql -d matrix-synapse -qAtX -c "select json_agg(t) FROM (SELECT * from event_json WHERE room_id = '!dfKadcascAbtdeeJdb:example.com') t;" -o messages.json` +`psql -d matrix-synapse -qAtX -c "select row_to_json(event_json) FROM event_json WHERE room_id = '!dfKadcascAbtdeeJdb:example.com';" -o messages.json` ### Using this tool diff --git a/src/main.rs b/src/main.rs index 605db68..086bd79 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,30 +25,37 @@ fn main() { let args = Args::parse(); let keyfile_data = fs::read_to_string(args.keyfile).expect("Unable to read keyfile"); - let messagefile_data = - fs::read_to_string(args.messagefile).expect("Unable to read messagefile"); + let mut messagefile_reader = + std::io::BufReader::new(fs::File::open(args.messagefile).expect("Unable to read messagefile")); let sessionkeys = get_sessionkeys_from_json(keyfile_data); - let messages = get_messages_from_json(messagefile_data); - - let decrypted_messages = serde_json::to_string(&get_decrypted_messages(messages, sessionkeys)) - .expect("Failed serlializing finished data to json"); - - if args.output.is_some() { - let mut file = - std::fs::File::create(args.output.unwrap()).expect("Failed creating output file"); - let _ = file - .write_all(decrypted_messages.as_bytes()) - .expect("Failed writing to output file"); - } else { - println!("{}", decrypted_messages); + + let messages_stream = serde_json::Deserializer::from_reader(&mut messagefile_reader).into_iter().map(Result::unwrap); + + let mut output_file = args.output.map(|output| { + std::io::BufWriter::new(std::fs::File::create(output).expect("Failed creating output file")) + }); + + for decrypted_message in get_decrypted_messages(messages_stream, sessionkeys) { + let serialized = serde_json::to_string(&decrypted_message).expect("Failed serlializing finished data to json"); + + if let Some(ref mut out) = output_file { + out + .write_all(serialized.as_bytes()) + .expect("Failed writing to output file"); + out + .write_all(b"\n") + .expect("Failed writing to output file"); + } else { + println!("{}", serialized); + } } } fn get_decrypted_messages( - mut messages: Vec>, + messages: impl Iterator>, sessionkeys: HashMap, -) -> Vec> { - for m in messages.iter_mut() { +) -> impl Iterator> { + messages.map(move |mut m| { let message_id = m["event_id"].as_str().unwrap(); let j: HashMap = serde_json::from_str(&(m["json"].as_str().unwrap())) .expect(&format!("Error parsing message {message_id}")); @@ -67,23 +74,19 @@ fn get_decrypted_messages( let message = base64::engine::general_purpose::STANDARD_NO_PAD .decode(&content["ciphertext"].as_str().unwrap()) .unwrap(); - let decrypted_message = - serde_json::from_str(&get_decrypted_ciphertext(sessionkey, message)) - .expect("Parsing decrypted message failed"); - m.insert("content_decrypted".to_string(), decrypted_message); + if let Some(decrypted_message) = get_decrypted_ciphertext(sessionkey, message) { + m.insert("content_decrypted".to_string(), serde_json::from_str(&decrypted_message).expect("Parsing decrypted message failed")); + } else { + eprintln!("Message {message_id}: Decrypting message failed unexpectedly"); + } } else { eprintln!("Message {message_id}: No matching key found, skipping"); } } else { eprintln!("Message {message_id}: No encrypted payload, skipping"); } - } - - messages -} - -fn get_messages_from_json(messages: String) -> Vec> { - serde_json::from_str(&messages).unwrap() + m + }) } fn get_sessionkeys_from_json(keys_raw: String) -> HashMap { @@ -100,7 +103,7 @@ fn get_sessionkeys_from_json(keys_raw: String) -> HashMap { }) } -fn get_decrypted_ciphertext(sessionkey: Vec, ciphertext: Vec) -> String { +fn get_decrypted_ciphertext(sessionkey: Vec, ciphertext: Vec) -> Option { let session_key = vodozemac::megolm::ExportedSessionKey::from_bytes(&sessionkey).unwrap(); let mut session = vodozemac::megolm::InboundGroupSession::import( &session_key, @@ -109,7 +112,7 @@ fn get_decrypted_ciphertext(sessionkey: Vec, ciphertext: Vec) -> String let decrypted = session .decrypt(&vodozemac::megolm::MegolmMessage::from_bytes(&ciphertext).unwrap()) - .expect("Decrypting message failed unexpectedly"); + .ok()?; - String::from_utf8(decrypted.plaintext).unwrap() + Some(String::from_utf8(decrypted.plaintext).unwrap()) }