-
Notifications
You must be signed in to change notification settings - Fork 145
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(notifications): import kratos email (#4012)
* chore(core): sync email address to notifications * chore(notifications): import email from kratos * chore(notifications): remove redundant novu config * chore(core): update email after verified * chore(notifications): fix linting * chore(notifications): use correct path for google application creds --------- Co-authored-by: Sam Peters <[email protected]> Co-authored-by: Vaibhav <[email protected]>
- Loading branch information
1 parent
43fa9c1
commit f67f2c6
Showing
5 changed files
with
45 additions
and
57 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
use serde::{Deserialize, Serialize}; | ||
|
||
#[derive(Clone, Default, Debug, Serialize, Deserialize)] | ||
pub struct MongoImportConfig { | ||
pub struct KratosImportConfig { | ||
pub execute_import: bool, | ||
pub connection: Option<String>, | ||
pub pg_con: Option<String>, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,55 +1,48 @@ | ||
use anyhow::*; | ||
use futures::stream::StreamExt; | ||
use serde::Deserialize; | ||
use sqlx::{Postgres, QueryBuilder, Row}; | ||
|
||
mod config; | ||
mod mongodb; | ||
|
||
use crate::{app::NotificationsApp, primitives::*}; | ||
pub use config::*; | ||
|
||
#[derive(Debug, Deserialize)] | ||
#[serde(rename_all = "camelCase")] | ||
struct MongoUser { | ||
#[serde(default)] | ||
user_id: Option<String>, | ||
#[serde(default)] | ||
device_tokens: Vec<String>, | ||
} | ||
|
||
pub async fn import_user_notification_settings( | ||
pub async fn import_email_addresses( | ||
app: NotificationsApp, | ||
config: MongoImportConfig, | ||
config: KratosImportConfig, | ||
) -> anyhow::Result<()> { | ||
let client = mongodb::get_client(config).await?; | ||
let db = client.default_database().context("default database")?; | ||
let users = db.collection::<MongoUser>("users"); | ||
let mut cursor = users.find(None, None).await?; | ||
println!("EXECUTING EMAIL IMPORT"); | ||
let pool = sqlx::postgres::PgPoolOptions::new() | ||
.connect(&config.pg_con.expect("pg_con not set")) | ||
.await?; | ||
let mut last_email = String::new(); | ||
let mut total_users = 0; | ||
while let Some(maybe_user) = cursor.next().await { | ||
let user = match maybe_user { | ||
Err(e) => { | ||
println!("Error deserializing user: {:?}", e); | ||
continue; | ||
} | ||
core::result::Result::Ok(user) => user, | ||
}; | ||
if let Some(user_id) = user.user_id { | ||
if !user.device_tokens.is_empty() { | ||
let user_id = GaloyUserId::from(user_id); | ||
for device_token in user.device_tokens { | ||
app.add_push_device_token(user_id.clone(), PushDeviceToken::from(device_token)) | ||
.await?; | ||
} | ||
} | ||
loop { | ||
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new( | ||
r#"SELECT id, traits->>'email' AS email | ||
FROM identities | ||
WHERE traits->>'email' IS NOT NULL | ||
AND traits->>'email' >"#, | ||
); | ||
query_builder.push_bind(&last_email); | ||
query_builder.push("ORDER BY traits->>'email' LIMIT 1000;"); | ||
let query = query_builder.build(); | ||
let res = query.fetch_all(&pool).await?; | ||
if res.is_empty() { | ||
break; | ||
} | ||
|
||
total_users += 1; | ||
if total_users % 100 == 0 { | ||
println!("{total_users} users synced"); | ||
for row in res { | ||
let id: uuid::Uuid = row.get("id"); | ||
let email: String = row.get("email"); | ||
app.update_email_address( | ||
GaloyUserId::from(id.to_string()), | ||
GaloyEmailAddress::from(email.clone()), | ||
) | ||
.await?; | ||
total_users += 1; | ||
last_email = email; | ||
} | ||
println!("First {total_users} synced"); | ||
} | ||
println!("SYNCING FINISHED: {total_users} users sycned"); | ||
|
||
Ok(()) | ||
} |