diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 000000000..5d039c695 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,5 @@ +[toolchain] +channel = "stable" +components = [ "rustc", "cargo", "clippy", "rustfmt", "rust-analyzer" ] +targets = [ "wasm32-unknown-unknown" ] +profile = "default" diff --git a/xmtp/src/account.rs b/xmtp/src/account.rs index f6a337634..cb24ed14c 100644 --- a/xmtp/src/account.rs +++ b/xmtp/src/account.rs @@ -32,17 +32,15 @@ pub enum AccountError { BadAssocation(#[from] AssociationError), #[error("mutex poisoned error")] MutexPoisoned, - #[error("unknown error")] - Unknown, } +/// Holds an account and adds some serialization methods on top pub struct VmacAccount { account: OlmAccount, } -// Struct that holds an account and adds some serialization methods on top impl VmacAccount { - // Create a new instance + /// Create a new instance pub fn new(account: OlmAccount) -> Self { Self { account } } diff --git a/xmtp/src/builder.rs b/xmtp/src/builder.rs index 750eb4231..513724a05 100644 --- a/xmtp/src/builder.rs +++ b/xmtp/src/builder.rs @@ -14,10 +14,10 @@ use xmtp_proto::api_client::XmtpApiClient; #[derive(Error, Debug)] pub enum ClientBuilderError { #[error("Missing parameter: {parameter}")] - MissingParameterError { parameter: &'static str }, + MissingParameter { parameter: &'static str }, #[error("Failed to serialize/deserialize state for persistence: {source}")] - SerializationError { source: serde_json::Error }, + Serialization { source: serde_json::Error }, #[error("Required account was not found in cache.")] RequiredAccountNotFound, @@ -27,17 +27,16 @@ pub enum ClientBuilderError { #[error("Associating an address to account failed")] AssociationFailed(#[from] AssociationError), - // #[error("Error Initalizing Store")] - // StoreInitialization(#[from] SE), - #[error("Error Initalizing Account")] + + #[error("Error Initializing Account")] AccountInitialization(#[from] AccountError), #[error("Storage Error")] StorageError(#[from] StorageError), } -pub enum AccountStrategy { - CreateIfNotFound(O), +pub enum AccountStrategy { + CreateIfNotFound(InboxOwner), CachedOnly(Address), #[cfg(test)] ExternalAccount(Account), @@ -61,24 +60,20 @@ where } } -pub struct ClientBuilder -where - A: XmtpApiClient + Default, - O: InboxOwner, -{ - api_client: Option, +pub struct ClientBuilder { + api_client: Option, network: Network, account: Option, store: Option, - account_strategy: AccountStrategy, + account_strategy: AccountStrategy, } -impl ClientBuilder +impl ClientBuilder where - A: XmtpApiClient + Default, - O: InboxOwner, + ApiClient: XmtpApiClient, + Owner: InboxOwner, { - pub fn new(strat: AccountStrategy) -> Self { + pub fn new(strat: AccountStrategy) -> Self { Self { api_client: None, network: Network::Dev, @@ -88,7 +83,7 @@ where } } - pub fn api_client(mut self, api_client: A) -> Self { + pub fn api_client(mut self, api_client: ApiClient) -> Self { self.api_client = Some(api_client); self } @@ -110,7 +105,7 @@ where /// Fetch account from peristence or generate and sign a new one fn find_or_create_account( - owner: &O, + owner: &Owner, store: &mut EncryptedMessageStore, ) -> Result { let account = Self::retrieve_persisted_account(store)?; @@ -141,7 +136,7 @@ where Ok(accounts.pop()) } - fn sign_new_account(owner: &O) -> Result { + fn sign_new_account(owner: &Owner) -> Result { let sign = |public_key_bytes: Vec| -> Result { let assoc_text = AssociationText::Static { blockchain_address: owner.get_address(), @@ -155,8 +150,9 @@ where Account::generate(sign).map_err(ClientBuilderError::AccountInitialization) } - pub fn build(mut self) -> Result, ClientBuilderError> { - let api_client = self.api_client.take().unwrap_or_default(); + + pub fn build(mut self) -> Result, ClientBuilderError> { + let api_client = self.api_client.take().ok_or(ClientBuilderError::MissingParameter { parameter: "api_client"})?; let mut store = self.store.take().unwrap_or_default(); // Fetch the Account based upon the account strategy. let account = match self.account_strategy { @@ -193,7 +189,6 @@ mod tests { use crate::{ mock_xmtp_api_client::MockXmtpApiClient, storage::{EncryptedMessageStore, StorageOption}, - Client, }; use super::ClientBuilder; @@ -203,6 +198,7 @@ mod tests { let wallet = generate_local_wallet(); Self::new(wallet.into()) + .api_client(MockXmtpApiClient::default()) } } @@ -231,8 +227,9 @@ mod tests { )) .unwrap(); - let client_a: Client = ClientBuilder::new(wallet.clone().into()) + let client_a = ClientBuilder::new(wallet.clone().into()) .store(store_a) + .api_client(MockXmtpApiClient::default()) .build() .unwrap(); let keybytes_a = client_a @@ -251,8 +248,9 @@ mod tests { )) .unwrap(); - let client_b: Client = ClientBuilder::new(wallet.into()) + let client_b = ClientBuilder::new(wallet.into()) .store(store_b) + .api_client(MockXmtpApiClient::default()) .build() .unwrap(); let keybytes_b = client_b diff --git a/xmtp/src/client.rs b/xmtp/src/client.rs index 6754005b9..fd390a3e1 100644 --- a/xmtp/src/client.rs +++ b/xmtp/src/client.rs @@ -61,32 +61,23 @@ impl From<&str> for ClientError { } } -pub struct Client -where - A: XmtpApiClient, -{ - pub api_client: A, +pub struct Client { + pub api_client: ApiClient, pub(crate) network: Network, pub(crate) account: Account, pub store: EncryptedMessageStore, // Temporarily exposed outside crate for CLI client is_initialized: bool, } -impl core::fmt::Debug for Client -where - A: XmtpApiClient, -{ +impl core::fmt::Debug for Client { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "Client({:?})::{}", self.network, self.account.addr()) } } -impl Client -where - A: XmtpApiClient, -{ +impl Client { pub fn new( - api_client: A, + api_client: ApiClient, network: Network, account: Account, store: EncryptedMessageStore, @@ -103,60 +94,11 @@ where pub fn wallet_address(&self) -> Address { self.account.addr() } - + pub fn installation_id(&self) -> String { self.account.contact().installation_id() } - - pub async fn init(&mut self) -> Result<(), ClientError> { - let app_contact_bundle = self.account.contact(); - let registered_bundles = self.get_contacts(&self.wallet_address()).await?; - - if !registered_bundles - .iter() - .any(|contact| contact.installation_id() == app_contact_bundle.installation_id()) - { - self.publish_user_contact().await?; - } - - self.is_initialized = true; - - // Send any unsent messages - if let Err(err) = Conversations::process_outbound_messages(self).await { - log::error!("Could not process outbound messages on init: {:?}", err) - } - - Ok(()) - } - - pub async fn get_contacts(&self, wallet_address: &str) -> Result, ClientError> { - let topic = build_user_contact_topic(wallet_address.to_string()); - let response = self - .api_client - .query(QueryRequest { - content_topics: vec![topic], - start_time_ns: 0, - end_time_ns: 0, - paging_info: None, - }) - .await?; - - let mut contacts = vec![]; - for envelope in response.envelopes { - let contact_bundle = Contact::from_bytes(envelope.message, wallet_address.to_string()); - match contact_bundle { - Ok(bundle) => { - contacts.push(bundle); - } - Err(err) => { - log::error!("bad contact bundle: {:?}", err); - } - } - } - - Ok(contacts) - } - + pub fn get_session( &self, conn: &mut DbConnection, @@ -179,84 +121,6 @@ where .collect()) } - pub async fn refresh_user_installations_if_stale( - &self, - user_address: &str, - ) -> Result<(), ClientError> { - let user = self.store.get_user(&mut self.store.conn()?, user_address)?; - if user.is_none() || user.unwrap().last_refreshed < now() - INSTALLATION_REFRESH_INTERVAL_NS - { - self.refresh_user_installations(user_address).await?; - } - - Ok(()) - } - - /// Fetch Installations from the Network and create unintialized sessions for newly discovered contacts - // TODO: Reduce Visibility - pub async fn refresh_user_installations(&self, user_address: &str) -> Result<(), ClientError> { - // Store the timestamp of when the refresh process begins - let refresh_timestamp = now(); - - let self_install_id = key_fingerprint(&self.account.identity_keys().curve25519); - let contacts = self.get_contacts(user_address).await?; - let conn = &mut self.store.conn()?; - debug!( - "Fetched contacts for address {}: {:?}", - user_address, contacts - ); - - let installation_map = self - .store - .get_installations(conn, user_address)? - .into_iter() - .map(|v| (v.installation_id.clone(), v)) - .collect::>(); - - let new_installs: Vec = contacts - .iter() - .filter(|contact| self_install_id != contact.installation_id()) - .filter(|contact| !installation_map.contains_key(&contact.installation_id())) - .filter_map(|contact| StoredInstallation::new(contact).ok()) - .collect(); - debug!( - "New installs for address {}: {:?}", - user_address, new_installs - ); - - conn.transaction(|transaction_manager| -> Result<(), ClientError> { - self.store.insert_user( - transaction_manager, - StoredUser { - user_address: user_address.to_string(), - created_at: now(), - last_refreshed: refresh_timestamp, - }, - )?; - for install in new_installs { - info!("Saving Install {}", install.installation_id); - let session = self.create_uninitialized_session(&install.get_contact()?)?; - - self.store - .insert_install(transaction_manager, install)?; - self.store.insert_session( - transaction_manager, - StoredSession::try_from(&session)?, - )?; - } - - self.store.update_user_refresh_timestamp( - transaction_manager, - user_address, - refresh_timestamp, - )?; - - Ok(()) - })?; - - Ok(()) - } - pub fn get_contacts_from_db( &self, conn: &mut DbConnection, @@ -306,13 +170,67 @@ where if let Err(e) = session.store(conn) { match e { - StorageError::DieselResultError(_) => log::warn!("Session Already exists"), // TODO: Some thought is needed here, is this a critical error which should unroll? + StorageError::DieselResult(_) => log::warn!("Session Already exists"), // TODO: Some thought is needed here, is this a critical error which should unroll? other_error => return Err(other_error.into()), } } Ok((session, create_result.plaintext)) } +} + +impl Client +where + ApiClient: XmtpApiClient +{ + pub async fn init(&mut self) -> Result<(), ClientError> { + let app_contact_bundle = self.account.contact(); + let registered_bundles = self.get_contacts(&self.wallet_address()).await?; + + if !registered_bundles + .iter() + .any(|contact| contact.installation_id() == app_contact_bundle.installation_id()) + { + self.publish_user_contact().await?; + } + + self.is_initialized = true; + + // Send any unsent messages + if let Err(err) = Conversations::process_outbound_messages(self).await { + log::error!("Could not process outbound messages on init: {:?}", err) + } + + Ok(()) + } + + pub async fn get_contacts(&self, wallet_address: &str) -> Result, ClientError> { + let topic = build_user_contact_topic(wallet_address.to_string()); + let response = self + .api_client + .query(QueryRequest { + content_topics: vec![topic], + start_time_ns: 0, + end_time_ns: 0, + paging_info: None, + }) + .await?; + + let mut contacts = vec![]; + for envelope in response.envelopes { + let contact_bundle = Contact::from_bytes(envelope.message, wallet_address.to_string()); + match contact_bundle { + Ok(bundle) => { + contacts.push(bundle); + } + Err(err) => { + log::error!("bad contact bundle: {:?}", err); + } + } + } + + Ok(contacts) + } async fn publish_user_contact(&self) -> Result<(), ClientError> { let envelope = self.build_contact_envelope()?; @@ -375,6 +293,84 @@ where } Ok(None) } + + pub async fn refresh_user_installations_if_stale( + &self, + user_address: &str, + ) -> Result<(), ClientError> { + let user = self.store.get_user(&mut self.store.conn()?, user_address)?; + if user.is_none() || user.unwrap().last_refreshed < now() - INSTALLATION_REFRESH_INTERVAL_NS + { + self.refresh_user_installations(user_address).await?; + } + + Ok(()) + } + + /// Fetch Installations from the Network and create uninitialized sessions for newly discovered contacts + // TODO: Reduce Visibility + pub async fn refresh_user_installations(&self, user_address: &str) -> Result<(), ClientError> { + // Store the timestamp of when the refresh process begins + let refresh_timestamp = now(); + + let self_install_id = key_fingerprint(&self.account.identity_keys().curve25519); + let contacts = self.get_contacts(user_address).await?; + let conn = &mut self.store.conn()?; + debug!( + "Fetched contacts for address {}: {:?}", + user_address, contacts + ); + + let installation_map = self + .store + .get_installations(conn, user_address)? + .into_iter() + .map(|v| (v.installation_id.clone(), v)) + .collect::>(); + + let new_installs: Vec = contacts + .iter() + .filter(|contact| self_install_id != contact.installation_id()) + .filter(|contact| !installation_map.contains_key(&contact.installation_id())) + .filter_map(|contact| StoredInstallation::new(contact).ok()) + .collect(); + debug!( + "New installs for address {}: {:?}", + user_address, new_installs + ); + + conn.transaction(|transaction_manager| -> Result<(), ClientError> { + self.store.insert_user( + transaction_manager, + StoredUser { + user_address: user_address.to_string(), + created_at: now(), + last_refreshed: refresh_timestamp, + }, + )?; + for install in new_installs { + info!("Saving Install {}", install.installation_id); + let session = self.create_uninitialized_session(&install.get_contact()?)?; + + self.store + .insert_install(transaction_manager, install)?; + self.store.insert_session( + transaction_manager, + StoredSession::try_from(&session)?, + )?; + } + + self.store.update_user_refresh_timestamp( + transaction_manager, + user_address, + refresh_timestamp, + )?; + + Ok(()) + })?; + + Ok(()) + } } #[cfg(test)] @@ -448,7 +444,4 @@ mod tests { } } } - - #[tokio::test] - async fn test_roundtrip_encrypt() {} } diff --git a/xmtp/src/contact.rs b/xmtp/src/contact.rs index 9e4f11b7d..43f2cd263 100644 --- a/xmtp/src/contact.rs +++ b/xmtp/src/contact.rs @@ -24,9 +24,8 @@ pub enum ContactError { Decode(#[from] DecodeError), #[error("encode error")] Encode(#[from] EncodeError), - #[error("unknown error")] - Unknown, } + #[derive(Clone, Debug)] pub struct Contact { pub(crate) bundle: InstallationContactBundle, diff --git a/xmtp/src/conversation.rs b/xmtp/src/conversation.rs index b41cfd04a..8c3413689 100644 --- a/xmtp/src/conversation.rs +++ b/xmtp/src/conversation.rs @@ -12,7 +12,6 @@ use crate::{ Client, Store, }; use xmtp_proto::api_client::XmtpApiClient; -use xmtp_proto::xmtp::message_api::v1::PublishRequest; use prost::{DecodeError, Message}; // use async_trait::async_trait; diff --git a/xmtp/src/lib.rs b/xmtp/src/lib.rs index aca727d50..04a0e516a 100644 --- a/xmtp/src/lib.rs +++ b/xmtp/src/lib.rs @@ -29,10 +29,6 @@ pub trait Signable { fn bytes_to_sign(&self) -> Vec; } -pub trait Errorer { - type Error; -} - // Inserts a model to the underlying data store pub trait Store { fn store(&self, into: &mut I) -> Result<(), StorageError>; diff --git a/xmtp/src/mock_xmtp_api_client.rs b/xmtp/src/mock_xmtp_api_client.rs index ab6ce71c0..2d49b630e 100644 --- a/xmtp/src/mock_xmtp_api_client.rs +++ b/xmtp/src/mock_xmtp_api_client.rs @@ -98,7 +98,7 @@ impl XmtpApiClient for MockXmtpApiClient { Err(Error::new(ErrorKind::SubscribeError)) } - async fn batch_query(&self, request: BatchQueryRequest) -> Result { + async fn batch_query(&self, _request: BatchQueryRequest) -> Result { Err(Error::new(ErrorKind::BatchQueryError)) } } diff --git a/xmtp/src/session.rs b/xmtp/src/session.rs index bdc0de213..be3320944 100644 --- a/xmtp/src/session.rs +++ b/xmtp/src/session.rs @@ -97,7 +97,7 @@ impl TryFrom<&StoredSession> for SessionManager { type Error = StorageError; fn try_from(value: &StoredSession) -> Result { let pickle = serde_json::from_slice(&value.vmac_session_data) - .map_err(|_| StorageError::SerializationError)?; + .map_err(|_| StorageError::Serialization)?; Ok(Self::new( OlmSession::from_pickle(pickle), @@ -117,7 +117,7 @@ impl TryFrom<&SessionManager> for StoredSession { // TODO: Better error handling approach. StoreError and SessionError end up being dependent on eachother value .session_bytes() - .map_err(|_| StorageError::SerializationError)?, + .map_err(|_| StorageError::Serialization)?, value.user_address.clone(), )) } diff --git a/xmtp/src/storage/encrypted_store/mod.rs b/xmtp/src/storage/encrypted_store/mod.rs index 6164958aa..574cdc8f9 100644 --- a/xmtp/src/storage/encrypted_store/mod.rs +++ b/xmtp/src/storage/encrypted_store/mod.rs @@ -1,11 +1,11 @@ //! A durable object store powered by Sqlite and Diesel. //! -//! Provides mechanism to store objects between sessions. The behavor of the store can be tailored by +//! Provides mechanism to store objects between sessions. The behavior of the store can be tailored by //! choosing an appropriate `StoreOption`. //! //! ## Migrations //! -//! Table definitions are located `/migrations/`. On intialization the store will see if +//! Table definitions are located `/migrations/`. On initialization the store will see if //! there are any outstanding database migrations and perform them as needed. When updating the table //! definitions `schema.rs` must also be updated. To generate the correct schemas you can run //! `diesel print-schema` or use `cargo run update-schema` which will update the files for you. @@ -19,7 +19,7 @@ use self::{ schema::{accounts, conversations, installations, messages, refresh_jobs, users, sessions}, }; use super::{now, StorageError}; -use crate::{account::Account, utils::is_wallet_address, Errorer, Fetch, Store}; +use crate::{account::Account, utils::is_wallet_address, Fetch, Store}; use diesel::{ connection::SimpleConnection, prelude::*, @@ -64,14 +64,10 @@ pub struct EncryptedMessageStore { pool: Pool>, } -impl Errorer for EncryptedMessageStore { - type Error = StorageError; -} - impl Default for EncryptedMessageStore { fn default() -> Self { Self::new(StorageOption::Ephemeral, Self::generate_enc_key()) - .expect("Error Occured: tring to create default Ephemeral store") + .expect("Error Occurred: trying to create default Ephemeral store") } } @@ -93,11 +89,11 @@ impl EncryptedMessageStore { StorageOption::Ephemeral => Pool::builder() .max_size(1) .build(ConnectionManager::::new(":memory:")) - .map_err(|e| StorageError::DbInitError(e.to_string()))?, + .map_err(|e| StorageError::DbInit(e.to_string()))?, StorageOption::Persistent(ref path) => Pool::builder() .max_size(10) .build(ConnectionManager::::new(path)) - .map_err(|e| StorageError::DbInitError(e.to_string()))?, + .map_err(|e| StorageError::DbInit(e.to_string()))?, }; // // Setup SqlCipherKey @@ -121,7 +117,7 @@ impl EncryptedMessageStore { let conn = &mut self.conn()?; conn.run_pending_migrations(MIGRATIONS) - .map_err(|e| StorageError::DbInitError(e.to_string()))?; + .map_err(|e| StorageError::DbInit(e.to_string()))?; Ok(()) } @@ -138,7 +134,7 @@ impl EncryptedMessageStore { let conn = self .pool .get() - .map_err(|e| StorageError::PoolError(e.to_string()))?; + .map_err(|e| StorageError::Pool(e.to_string()))?; Ok(conn) } @@ -149,7 +145,7 @@ impl EncryptedMessageStore { ) -> Result<(), StorageError> { let conn = &mut pool .get() - .map_err(|e| StorageError::PoolError(e.to_string()))?; + .map_err(|e| StorageError::Pool(e.to_string()))?; conn.batch_execute(&format!( "PRAGMA key = \"x'{}'\";", @@ -631,7 +627,7 @@ impl Fetch for DbConnection { messages .load::(self) - .map_err(StorageError::DieselResultError) + .map_err(StorageError::DieselResult) } fn fetch_one(&mut self, key: i32) -> Result, StorageError> where { @@ -647,7 +643,7 @@ impl Fetch for DbConnection { sessions .load::(self) - .map_err(StorageError::DieselResultError) + .map_err(StorageError::DieselResult) } fn fetch_one(&mut self, key: &str) -> Result, StorageError> { @@ -663,7 +659,7 @@ impl Fetch for DbConnection { dsl::users .load::(self) - .map_err(StorageError::DieselResultError) + .map_err(StorageError::DieselResult) } fn fetch_one(&mut self, key: &str) -> Result, StorageError> { use self::schema::users::dsl::*; @@ -678,7 +674,7 @@ impl Fetch for DbConnection { dsl::conversations .load::(self) - .map_err(StorageError::DieselResultError) + .map_err(StorageError::DieselResult) } fn fetch_one(&mut self, key: &str) -> Result, StorageError> { use self::schema::conversations::dsl::*; @@ -742,7 +738,7 @@ impl Fetch for DbConnection { dsl::installations .load::(self) - .map_err(StorageError::DieselResultError) + .map_err(StorageError::DieselResult) } fn fetch_one(&mut self, key: &str) -> Result, StorageError> { use self::schema::installations::dsl::*; @@ -927,8 +923,8 @@ mod tests { let res = EncryptedMessageStore::new(StorageOption::Persistent(db_path.clone()), enc_key); // Ensure it fails match res.err() { - Some(StorageError::DbInitError(_)) => (), - _ => panic!("Expected a DbInitError"), + Some(StorageError::DbInit(_)) => (), + _ => panic!("Expected a DbInit"), } fs::remove_file(db_path).unwrap(); } diff --git a/xmtp/src/storage/errors.rs b/xmtp/src/storage/errors.rs index 27566eedc..cf2e5f0c5 100644 --- a/xmtp/src/storage/errors.rs +++ b/xmtp/src/storage/errors.rs @@ -1,24 +1,19 @@ -use crate::contact::ContactError; use thiserror::Error; #[derive(Debug, Error)] pub enum StorageError { #[error("Diesel connection error")] - DieselConnectError(#[from] diesel::ConnectionError), + DieselConnect(#[from] diesel::ConnectionError), #[error("Diesel result error: {0}")] - DieselResultError(#[from] diesel::result::Error), + DieselResult(#[from] diesel::result::Error), #[error("Pool error {0}")] - PoolError(String), + Pool(String), #[error("Either incorrect encryptionkey or file is not a db {0}")] - DbInitError(String), + DbInit(String), #[error("Store Error")] Store(String), - #[error(transparent)] - ImplementationError(#[from] anyhow::Error), - #[error("ContactError")] - ContactError(#[from] ContactError), #[error("serialization error")] - SerializationError, + Serialization, #[error("unknown storage error: {0}")] Unknown(String), } diff --git a/xmtp_api_grpc/src/grpc_api_helper.rs b/xmtp_api_grpc/src/grpc_api_helper.rs index dcc9a95ce..6766db467 100644 --- a/xmtp_api_grpc/src/grpc_api_helper.rs +++ b/xmtp_api_grpc/src/grpc_api_helper.rs @@ -96,13 +96,6 @@ impl Client { } } -impl Default for Client { - fn default() -> Self { - //TODO: Remove once Default constraint lifted from clientBuilder - unimplemented!() - } -} - #[async_trait] impl XmtpApiClient for Client { type Subscription = Subscription;