Skip to content

Commit

Permalink
added util and tests (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
daria-github authored Sep 27, 2023
1 parent 1f36206 commit 246209f
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 29 deletions.
2 changes: 1 addition & 1 deletion xmtp/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ where
#[cfg(test)]
AccountStrategy::ExternalAccount(a) => a,
};
store.insert_or_ignore_user(
store.insert_user(
&mut store.conn()?,
StoredUser {
user_address: account.addr(),
Expand Down
6 changes: 3 additions & 3 deletions xmtp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ where
);

conn.transaction(|transaction_manager| -> Result<(), ClientError> {
self.store.insert_or_ignore_user(
self.store.insert_user(
transaction_manager,
StoredUser {
user_address: user_address.to_string(),
Expand All @@ -238,8 +238,8 @@ where
let session = self.create_uninitialized_session(&install.get_contact()?)?;

self.store
.insert_or_ignore_install(transaction_manager, install)?;
self.store.insert_or_ignore_session(
.insert_install(transaction_manager, install)?;
self.store.insert_session(
transaction_manager,
StoredSession::try_from(&session)?,
)?;
Expand Down
4 changes: 2 additions & 2 deletions xmtp/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ where
) -> Result<(), ConversationError> {
let peer_address = peer_addr_from_convo_id(convo_id, &client.wallet_address())?;
let created_at = now();
client.store.insert_or_ignore_user(
client.store.insert_user(
conn,
StoredUser {
user_address: peer_address.clone(),
Expand All @@ -124,7 +124,7 @@ where
},
)?;

client.store.insert_or_ignore_conversation(
client.store.insert_conversation(
conn,
StoredConversation {
peer_address,
Expand Down
2 changes: 1 addition & 1 deletion xmtp/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl<A: XmtpApiClient> Conversations<A> {

client
.store
.insert_or_ignore_message(conn, stored_message)?;
.insert_message(conn, stored_message)?;

Ok(())
}
Expand Down
85 changes: 63 additions & 22 deletions xmtp/src/storage/encrypted_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod schema;

use self::{
models::*,
schema::{accounts, conversations, installations, messages, refresh_jobs, users},
schema::{accounts, conversations, installations, messages, refresh_jobs, users, sessions},
};
use super::{now, StorageError};
use crate::{account::Account, utils::is_wallet_address, Errorer, Fetch, Store};
Expand All @@ -25,7 +25,8 @@ use diesel::{
prelude::*,
r2d2::{ConnectionManager, Pool, PooledConnection},
sql_query,
sql_types::Text,
sql_types::Text, result::DatabaseErrorKind,
result::Error,
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use log::warn;
Expand All @@ -45,6 +46,16 @@ pub enum StorageOption {
Persistent(String),
}

pub fn ignore_unique_violation<T>(
result: Result<T, diesel::result::Error>,
) -> Result<(), StorageError> {
match result {
Ok(_) => Ok(()),
Err(Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _)) => Ok(()),
Err(error) => Err(StorageError::from(error)),
}
}

#[allow(dead_code)]
#[derive(Clone)]
/// Manages a Sqlite db for persisting messages and other objects.
Expand Down Expand Up @@ -279,15 +290,16 @@ impl EncryptedMessageStore {
.map_err(|e| e.into())
}

pub fn insert_or_ignore_user(
pub fn insert_user(
&self,
conn: &mut DbConnection,
user: StoredUser,
) -> Result<(), StorageError> {
diesel::insert_or_ignore_into(users::table)
let result = diesel::insert_into(users::table)
.values(user)
.execute(conn)?;
Ok(())
.execute(conn);

ignore_unique_violation(result)
}

pub fn get_conversation(
Expand All @@ -303,15 +315,16 @@ impl EncryptedMessageStore {
Ok(convo_list.pop())
}

pub fn insert_or_ignore_conversation(
pub fn insert_conversation(
&self,
conn: &mut DbConnection,
conversation: StoredConversation,
) -> Result<(), StorageError> {
diesel::insert_or_ignore_into(schema::conversations::table)
let result = diesel::insert_into(conversations::table)
.values(conversation)
.execute(conn)?;
Ok(())
.execute(conn);

ignore_unique_violation(result)
}

pub fn get_contacts(
Expand Down Expand Up @@ -429,37 +442,40 @@ impl EncryptedMessageStore {
Ok(())
}

pub fn insert_or_ignore_install(
pub fn insert_install(
&self,
conn: &mut DbConnection,
install: StoredInstallation,
) -> Result<(), StorageError> {
diesel::insert_or_ignore_into(installations::table)
let result = diesel::insert_into(installations::table)
.values(install)
.execute(conn)?;
Ok(())
.execute(conn);

ignore_unique_violation(result)
}

pub fn insert_or_ignore_session(
pub fn insert_session(
&self,
conn: &mut DbConnection,
session: StoredSession,
) -> Result<(), StorageError> {
diesel::insert_or_ignore_into(schema::sessions::table)
let result = diesel::insert_into(sessions::table)
.values(session)
.execute(conn)?;
Ok(())
.execute(conn);

ignore_unique_violation(result)
}

pub fn insert_or_ignore_message(
pub fn insert_message(
&self,
conn: &mut DbConnection,
msg: NewStoredMessage,
) -> Result<(), StorageError> {
diesel::insert_or_ignore_into(schema::messages::table)
let result = diesel::insert_into(messages::table)
.values(msg)
.execute(conn)?;
Ok(())
.execute(conn);

ignore_unique_violation(result)
}

pub fn commit_outbound_payloads_for_message(
Expand Down Expand Up @@ -756,6 +772,7 @@ mod tests {
};
use std::fs;
use std::{thread::sleep, time::Duration};
use std::boxed::Box;

fn rand_string() -> String {
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
Expand Down Expand Up @@ -1135,4 +1152,28 @@ mod tests {
.unwrap();
assert_eq!(1, results_with_limit.len());
}

#[test]
fn it_returns_ok_when_given_ok_result() {
let result: Result<(), diesel::result::Error> = Ok(());
assert!(super::ignore_unique_violation(result).is_ok(), "Expected Ok(()) when given Ok result");
}

#[test]
fn it_returns_ok_on_unique_violation_error() {
let result: Result<(),diesel::result::Error> = Err(diesel::result::Error::DatabaseError(diesel::result::DatabaseErrorKind::UniqueViolation, Box::new("violation".to_string())));
assert!(super::ignore_unique_violation(result).is_ok(), "Expected Ok(()) when given UniqueViolation error");
}

#[test]
fn it_returns_err_on_non_unique_violation_database_errors() {
let result: Result<(), diesel::result::Error> = Err(diesel::result::Error::DatabaseError(diesel::result::DatabaseErrorKind::NotNullViolation, Box::new("other kind".to_string())));
assert!(super::ignore_unique_violation(result).is_err(), "Expected Err when given non-UniqueViolation database error");
}

#[test]
fn it_returns_err_on_non_database_errors() {
let result: Result<(), diesel::result::Error> = Err(diesel::result::Error::NotFound);
assert!(super::ignore_unique_violation(result).is_err(), "Expected Err when given a non-database error");
}
}

0 comments on commit 246209f

Please sign in to comment.