Skip to content

Commit

Permalink
async import
Browse files Browse the repository at this point in the history
  • Loading branch information
codabrink committed Jan 15, 2025
1 parent 2ac3cde commit 0c538c9
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 32 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ web-time = "1.1"
# - https://github.com/seanmonstar/reqwest/issues/2159
# - https://github.com/hyperium/tonic/pull/1974
# - https://github.com/rustls/rustls-platform-verifier/issues/58
async-compression = { default-features = false, version = "0.4", features = [
"tokio",
"zstd",
] }
bincode = "1.3"
console_error_panic_hook = "0.1"
const_format = "0.2"
Expand Down
2 changes: 2 additions & 0 deletions bindings_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub enum GenericError {
Identity(#[from] xmtp_mls::identity::IdentityError),
#[error(transparent)]
JoinError(#[from] tokio::task::JoinError),
#[error(transparent)]
IoError(#[from] tokio::io::Error),
}

#[derive(uniffi::Error, thiserror::Error, Debug)]
Expand Down
37 changes: 35 additions & 2 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use xmtp_id::{
},
InboxId,
};
use xmtp_mls::groups::device_sync::backup::BackupOptions;
use xmtp_mls::groups::device_sync::backup::{BackupImporter, BackupOptions};
use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate;
use xmtp_mls::groups::scoped_client::LocalScopedGroupClient;
use xmtp_mls::groups::HmacKey;
Expand Down Expand Up @@ -50,7 +50,7 @@ use xmtp_mls::{
},
AbortHandle, GenericStreamHandle, StreamHandle,
};
use xmtp_proto::xmtp::device_sync::BackupElementSelection;
use xmtp_proto::xmtp::device_sync::{BackupElementSelection, BackupMetadata};
use xmtp_proto::xmtp::mls::message_contents::content_types::ReactionV2;
use xmtp_proto::xmtp::mls::message_contents::{DeviceSyncKind, EncodedContent};
pub type RustXmtpClient = MlsClient<TonicApiClient>;
Expand Down Expand Up @@ -569,6 +569,31 @@ impl FfiXmtpClient {

Ok(())
}

pub async fn metadata(&self, path: String) -> Result<FfiBackupMetadata, GenericError> {
let file = tokio::fs::File::open(path).await?;
let importer = BackupImporter::open(file).await?;
Ok(importer.metadata.into())
}
}
#[derive(uniffi::Record)]
pub struct FfiBackupMetadata {
backup_version: u32,
elements: Vec<FfiBackupElementSelection>,
exported_at_ns: i64,
start_ns: Option<i64>,
end_ns: Option<i64>,
}
impl From<BackupMetadata> for FfiBackupMetadata {
fn from(value: BackupMetadata) -> Self {
Self {
backup_version: value.backup_version,
elements: value.elements().into_iter().map(Into::into).collect(),
start_ns: value.start_ns,
end_ns: value.end_ns,
exported_at_ns: value.exported_at_ns,
}
}
}

#[derive(uniffi::Record)]
Expand Down Expand Up @@ -600,6 +625,14 @@ impl From<FfiBackupElementSelection> for BackupElementSelection {
}
}
}
impl From<BackupElementSelection> for FfiBackupElementSelection {
fn from(value: BackupElementSelection) -> Self {
match value {
BackupElementSelection::Consent => Self::Consent,
BackupElementSelection::Messages => Self::Messages,
}
}
}

impl From<HmacKey> for FfiHmacKey {
fn from(value: HmacKey) -> Self {
Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ update-schema = ["toml"]

[dependencies]
aes-gcm = { version = "0.10.3", features = ["std"] }
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bincode.workspace = true
Expand Down
19 changes: 9 additions & 10 deletions xmtp_mls/src/groups/device_sync/backup.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use std::{path::Path, sync::Arc};

use super::DeviceSyncError;
use crate::storage::xmtp_openmls_provider::XmtpOpenMlsProvider;
use backup_exporter::BackupExporter;
use std::{path::Path, sync::Arc};
use thiserror::Error;
use xmtp_common::time::now_ns;
use xmtp_proto::xmtp::device_sync::{BackupElementSelection, BackupMetadata};

pub use backup_importer::BackupImporter;

// Increment on breaking changes
const BACKUP_VERSION: u32 = 0;

mod backup_exporter;
mod backup_importer;
mod export_stream;

use crate::storage::xmtp_openmls_provider::XmtpOpenMlsProvider;

use super::DeviceSyncError;

#[derive(Debug, Error)]
pub enum BackupError {
#[error("Missing metadata")]
Expand Down Expand Up @@ -90,14 +89,14 @@ mod tests {
let mut exporter = BackupExporter::new(opts, &alix_provider);
let path = Path::new("archive.zstd");
let _ = std::fs::remove_file(path);
exporter.write_to_file(&path).unwrap();
exporter.write_to_file(path).unwrap();

let alix2_wallet = generate_local_wallet();
let alix2 = ClientBuilder::new_test_client(&alix2_wallet).await;
let alix2_provider = Arc::new(alix2.mls_provider().unwrap());

let file = File::open(path).unwrap();
let mut importer = BackupImporter::open(file).unwrap();
importer.insert(&alix2_provider).unwrap();
let file = tokio::fs::File::open(path).await.unwrap();
let mut importer = BackupImporter::open(file).await.unwrap();
importer.insert(&alix2_provider).await.unwrap();
}
}
45 changes: 25 additions & 20 deletions xmtp_mls/src/groups/device_sync/backup/backup_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@ use crate::{
},
Store, XmtpOpenMlsProvider,
};
use async_compression::tokio::bufread::ZstdDecoder;
use prost::Message;
use std::io::{BufReader, Read};
use std::pin::Pin;
use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, BufReader};
use xmtp_proto::xmtp::device_sync::{backup_element::Element, BackupElement, BackupMetadata};
use zstd::stream::Decoder;

use super::BackupError;

pub(super) struct BackupImporter<'a> {
pub struct BackupImporter {
decoded: Vec<u8>,
decoder: Decoder<'a, BufReader<Box<dyn Read>>>,
metadata: BackupMetadata,
decoder: ZstdDecoder<Pin<Box<dyn AsyncBufRead + Send>>>,
pub metadata: BackupMetadata,
}

impl<'a> BackupImporter<'a> {
pub fn open(reader: impl Read + 'static) -> Result<Self, DeviceSyncError> {
let reader = Box::new(reader) as Box<_>;
let decoder = Decoder::new(reader)?;
impl BackupImporter {
pub async fn open(reader: impl AsyncRead + Send + 'static) -> Result<Self, DeviceSyncError> {
let reader = BufReader::new(reader);
let reader = Box::pin(reader) as Pin<Box<_>>;
let decoder = ZstdDecoder::new(reader);

let mut importer = Self {
decoder,
decoded: vec![],
Expand All @@ -31,7 +34,7 @@ impl<'a> BackupImporter<'a> {

let Some(BackupElement {
element: Some(Element::Metadata(metadata)),
}) = importer.next_element()?
}) = importer.next_element().await?
else {
return Err(BackupError::MissingMetadata)?;
};
Expand All @@ -40,11 +43,11 @@ impl<'a> BackupImporter<'a> {
Ok(importer)
}

fn next_element(&mut self) -> Result<Option<BackupElement>, StorageError> {
async fn next_element(&mut self) -> Result<Option<BackupElement>, StorageError> {
let mut buffer = [0u8; 1024];
let mut element_len = 0;
loop {
let amount = self.decoder.read(&mut buffer)?;
let amount = self.decoder.read(&mut buffer).await?;
self.decoded.extend_from_slice(&buffer[..amount]);

if element_len == 0 && self.decoded.len() >= 4 {
Expand All @@ -68,14 +71,16 @@ impl<'a> BackupImporter<'a> {
Ok(None)
}

pub fn insert(&mut self, provider: &XmtpOpenMlsProvider) -> Result<(), StorageError> {
provider.transaction(|provider| {
let conn = provider.conn_ref();
while let Some(element) = self.next_element()? {
insert(element, conn)?;
}
Ok(())
})
pub async fn insert(&mut self, provider: &XmtpOpenMlsProvider) -> Result<(), StorageError> {
provider
.transaction_async(|provider| async move {
let conn = provider.conn_ref();
while let Some(element) = self.next_element().await? {
insert(element, conn)?;
}
Ok(())
})
.await
}

pub fn metadata(&self) -> &BackupMetadata {
Expand Down

0 comments on commit 0c538c9

Please sign in to comment.