From 8465df893fb69e5a28781578c613eff7141d012b Mon Sep 17 00:00:00 2001 From: baalmart Date: Fri, 4 Oct 2024 17:20:12 +0300 Subject: [PATCH 1/3] changing the email content based on field activity --- k8s/kafka/topics/kafka-topics.yaml | 33 ++++- src/auth-service/.env.example | 123 +++++++++++++++++++ src/auth-service/bin/jobs/kafka-consumer.js | 129 +++++++++++++++++++- src/auth-service/config/global/envs.js | 2 + src/auth-service/utils/email.msgs.js | 45 +++++++ src/auth-service/utils/mailer.js | 100 +++++++++++++++ 6 files changed, 427 insertions(+), 5 deletions(-) create mode 100644 src/auth-service/.env.example diff --git a/k8s/kafka/topics/kafka-topics.yaml b/k8s/kafka/topics/kafka-topics.yaml index d1304eec2d..088c1d5141 100644 --- a/k8s/kafka/topics/kafka-topics.yaml +++ b/k8s/kafka/topics/kafka-topics.yaml @@ -65,7 +65,7 @@ spec: partitions: 2 replicas: 2 config: - retention.ms: 86400000 + retention.ms: 86400000 --- apiVersion: kafka.strimzi.io/v1beta2 @@ -79,7 +79,7 @@ spec: partitions: 2 replicas: 2 config: - retention.ms: 86400000 + retention.ms: 86400000 --- apiVersion: kafka.strimzi.io/v1beta2 @@ -262,4 +262,31 @@ spec: replicas: 2 config: retention.ms: 18000000 - + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + name: recall-topic + namespace: message-broker + labels: + strimzi.io/cluster: kafka-cluster +spec: + partitions: 2 + replicas: 2 + config: + retention.ms: 18000000 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + name: deploy-topic + namespace: message-broker + labels: + strimzi.io/cluster: kafka-cluster +spec: + partitions: 2 + replicas: 2 + config: + retention.ms: 18000000 diff --git a/src/auth-service/.env.example b/src/auth-service/.env.example new file mode 100644 index 0000000000..4d1ba2cbcf --- /dev/null +++ b/src/auth-service/.env.example @@ -0,0 +1,123 @@ +AIRQO_WEBSITE=https://airqo.net/ +AIRQO_YOUTUBE= +DB_NAME_STAGING= +FIREBASE_DATABASE_URL= +GOOGLE_APPLICATION_CREDENTIALS= +INSTANCE_ID= +JWT_SECRET= +KICKBOX_API_KEY= +LOCAL_DB= +MAILCHIMP_API_KEY= +MAILCHIMP_LIST_ID= +MAILCHIMP_MOBILE_API_KEY= +MAILCHIMP_SERVER_PREFIX= +MAIL_PASS= +MAIL_USER= +MONGO_DEV= +MONGO_GCE_HOST= +MONGO_GCE_PASSWORD= +MONGO_GCE_PORT= +MONGO_GCE_URI= +MONGO_GCE_USERNAME= +MONGO_STAGE= +NODE_PATH= +REQUEST_ACCESS_EMAILS= +HARDWARE_AND_DS_EMAILS= +PLATFORM_AND_DS_EMAILS= +PLATFORM_EMAILS= +COMMS_EMAILS= +POLICY_EMAILS= +CHAMPIONS_EMAILS= +ASSISTANCE_EMAILS= +RESEARCHERS_EMAILS= +DEVELOPERS_EMAILS= +PARTNERS_EMAILS= +PASSWORD_REGEX= +PLATFORM_DEV_BASE_URL= +PLATFORM_STAGING_BASE_URL= +SESSION_SECRET= +MONGO_STAGE_URI= +SUPPORT_EMAIL= +MONGO_PROD_URI= +MONGO_DEV_URI= +FIREBASE_COLLECTION_USERS= +FIREBASE_COLLECTION_KYA= +FIREBASE_COLLECTION_ANALYTICS= +FIREBASE_COLLECTION_NOTIFICATIONS= +FIREBASE_COLLECTION_FAVORITE_PLACES= +PRODUCTS_DEV_EMAIL= +DEFAULT_LIMIT= +SLACK_TOKEN= +SLACK_CHANNEL= +SLACK_USERNAME= +PROD_DEFAULT_NETWORK= +STAGE_DEFAULT_NETWORK= +DEV_DEFAULT_NETWORK= +PROD_DEFAULT_NETWORK_ROLE= +STAGE_DEFAULT_NETWORK_ROLE= +DEV_DEFAULT_NETWORK_ROLE= +MOBILE_APP_USERS_TOPIC= +DEPLOY_TOPIC= +RECALL_TOPIC= +DEFAULT_TENANT= +UNIQUE_CONSUMER_GROUP= +UNIQUE_PRODUCER_GROUP= +NEW_MOBILE_APP_USER_TOPIC= +KAFKA_BOOTSTRAP_SERVERS_DEV= +KAFKA_CLIENT_ID_DEV= +KAFKA_CLIENT_GROUP_DEV= +KAFKA_BOOTSTRAP_SERVERS_STAGE= +KAFKA_CLIENT_ID_STAGE= +KAFKA_CLIENT_GROUP_STAGE= +KAFKA_BOOTSTRAP_SERVERS_PROD= +KAFKA_CLIENT_ID_PROD= +KAFKA_CLIENT_GROUP_PROD= +GOOGLE_CLIENT_ID= +GOOGLE_CLIENT_SECRET= +SUPER_ADMIN_PERMISSIONS= +TENANTS= +NETWORKS= +GMAIL_VERIFICATION_FAILURE_REDIRECT= +GMAIL_VERIFICATION_SUCCESS_REDIRECT= +FIREBASE_API_KEY= +FIREBASE_AUTH_DOMAIN= +FIREBASE_PROJECT_ID= +FIREBASE_TYPE= +FIREBASE_PRIVATE_KEY_ID= +FIREBASE_PRIVATE_KEY= +FIREBASE_CLIENT_EMAIL= +FIREBASE_CLIENT_ID= +FIREBASE_AUTH_URI= +FIREBASE_TOKEN_URI= +FIREBASE_AUTH_PROVIDER_X509_CERT_URL= +FIREBASE_CLIENT_X509_CERT_URL= +FIREBASE_UNIVERSE_DOMAIN= +FIREBASE_AUTHORIZATION_URL= +AIRQO_ANALYTICS_GMAIL= +AIRQO_ANALYTICS_GMAIL_PASS= +MOBILE_APP_PACKAGE_NAME= +MOBILE_APP_DYNAMIC_LINK_DOMAIN= +DEV_REDIS_SERVER= +DEV_REDIS_PORT= +PROD_REDIS_SERVER= +PROD_REDIS_PORT= +STAGE_REDIS_SERVER= +STAGE_REDIS_PORT= +STAGE_DEFAULT_GROUP= +DEV_DEFAULT_GROUP= +PROD_DEFAULT_GROUP= +ANALYTICS_PRODUCTION_BASE_URL= +ANALYTICS_STAGING_BASE_URL= +ANALYTICS_DEV_BASE_URL= +STAGE_DEFAULT_GROUP_ROLE= +PROD_DEFAULT_GROUP_ROLE= +DEV_DEFAULT_GROUP_ROLE= +STAGE_DEFAULT_AIRQLOUD= +PROD_DEFAULT_AIRQLOUD= +DEV_DEFAULT_AIRQLOUD= +STAGE_DEFAULT_GRID= +PROD_DEFAULT_GRID= +DEV_DEFAULT_GRID= +STAGE_DEFAULT_COHORT= +PROD_DEFAULT_COHORT= +DEV_DEFAULT_COHORT= \ No newline at end of file diff --git a/src/auth-service/bin/jobs/kafka-consumer.js b/src/auth-service/bin/jobs/kafka-consumer.js index 8a8ed661c3..942446600c 100644 --- a/src/auth-service/bin/jobs/kafka-consumer.js +++ b/src/auth-service/bin/jobs/kafka-consumer.js @@ -11,14 +11,56 @@ const Joi = require("joi"); const { jsonrepair } = require("jsonrepair"); const BlacklistedIPRangeModel = require("@models/BlacklistedIPRange"); const BlacklistedIPModel = require("@models/BlacklistedIP"); +const UserModel = require("@models/User"); const stringify = require("@utils/stringify"); const rangeCheck = require("ip-range-check"); const asyncRetry = require("async-retry"); +const mongoose = require("mongoose"); +const isEmpty = require("is-empty"); +const ObjectId = mongoose.Types.ObjectId; const userSchema = Joi.object({ email: Joi.string().email().empty("").required(), }).unknown(true); +const extractDeviceDetails = (updatedDevice) => { + const { + _id, + status, + category, + isActive, + long_name, + device_number, + name, + deployment_date, + latitude, + longitude, + mountType, + powerType, + } = updatedDevice; + + return { + status, + category, + _id, + isActive, + long_name, + device_number, + name, + deployment_date, + latitude, + longitude, + mountType, + powerType, + }; +}; + +const extractActivityDetails = (createdActivity) => { + return { + ...createdActivity, // Spread operator to include all properties directly + }; +}; + const operationForNewMobileAppUser = async (messageData) => { try { logger.info( @@ -177,7 +219,89 @@ const operationForBlacklistedIPs = async (messageData) => { } }; -const operationFunction2 = async (messageData) => {}; +const emailsForDeployedDevices = async (messageData) => { + const { createdActivity, updatedDevice, user_id } = JSON.parse(messageData); + + // Validate input data + if (!createdActivity || !updatedDevice || !user_id) { + logger.error("Invalid input data: Missing required fields."); + return; + } + + try { + const user = await UserModel("airqo") + .findOne({ _id: ObjectId(user_id) }, "email _id firstName lastName") + .lean(); + + // Check if user exists + if (!user) { + logger.error(`User not found for user_id: ${user_id}`); + return; + } + + const emailResponse = await mailer.fieldActivity({ + email: user.email, + firstName: user.firstName, + lastName: user.lastName, + deviceDetails: extractDeviceDetails(updatedDevice), + activityDetails: extractActivityDetails(createdActivity), + activityType: "deploy", + }); + + // Handle email response + if (emailResponse && emailResponse.success === false) { + logger.error(`🐛 Internal Server Error -- ${stringify(emailResponse)}`); + } + } catch (error) { + logger.error(`🐛 Internal Server Error -- ${error.message}`); + } +}; + +const emailsForRecalledDevices = async (messageData) => { + // Parse the message and validate input data + let parsedData; + try { + parsedData = JSON.parse(messageData); + } catch (error) { + logger.error("Invalid JSON format in messageData."); + return; + } + + const { createdActivity, updatedDevice, user_id } = parsedData; + + if (!createdActivity || !updatedDevice || !user_id) { + logger.error("Invalid input data: Missing required fields."); + return; + } + + try { + const user = await UserModel("airqo") + .findOne({ _id: ObjectId(user_id) }, "email _id firstName lastName") + .lean(); + + // Check if user exists + if (!user) { + logger.error(`User not found for user_id: ${user_id}`); + return; + } + + const emailResponse = await mailer.fieldActivity({ + email: user.email, + firstName: user.firstName, + lastName: user.lastName, + deviceDetails: extractDeviceDetails(updatedDevice), + activityDetails: extractActivityDetails(createdActivity), + activityType: "recall", + }); + + // Handle email response + if (emailResponse && emailResponse.success === false) { + logger.error(`🐛 Internal Server Error -- ${stringify(emailResponse)}`); + } + } catch (error) { + logger.error(`🐛 Internal Server Error -- ${error.message}`); + } +}; const kafkaConsumer = async () => { try { @@ -196,7 +320,8 @@ const kafkaConsumer = async () => { const topicOperations = { // ["new-mobile-app-user-topic"]: operationForNewMobileAppUser, ["ip-address"]: operationForBlacklistedIPs, - // topic2: operationFunction2, + ["deploy-topic"]: emailsForDeployedDevices, + ["recall-topic"]: emailsForRecalledDevices, }; await consumer.connect(); // Subscribe to all topics in the mapping diff --git a/src/auth-service/config/global/envs.js b/src/auth-service/config/global/envs.js index 10598a7297..0c0f79d332 100644 --- a/src/auth-service/config/global/envs.js +++ b/src/auth-service/config/global/envs.js @@ -29,6 +29,8 @@ const envs = { GOOGLE_CLIENT_ID: process.env.GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET: process.env.GOOGLE_CLIENT_SECRET, MOBILE_APP_USERS_TOPIC: process.env.MOBILE_APP_USERS_TOPIC, + DEPLOY_TOPIC: process.env.DEPLOY_TOPIC, + RECALL_TOPIC: process.env.RECALL_TOPIC, UNIQUE_CONSUMER_GROUP: process.env.UNIQUE_CONSUMER_GROUP, UNIQUE_PRODUCER_GROUP: process.env.UNIQUE_PRODUCER_GROUP, NEW_MOBILE_APP_USER_TOPIC: process.env.NEW_MOBILE_APP_USER_TOPIC, diff --git a/src/auth-service/utils/email.msgs.js b/src/auth-service/utils/email.msgs.js index 3ae15f9331..dfeb8b6446 100644 --- a/src/auth-service/utils/email.msgs.js +++ b/src/auth-service/utils/email.msgs.js @@ -323,6 +323,51 @@ module.exports = { return constants.EMAIL_BODY({ email, content, name }); }, + field_activity: ({ + firstName = "", + lastName = "", + activityDetails = {}, + deviceDetails = {}, + email = "", + activityType = "recall", // New parameter to determine activity type + }) => { + // Create a list of activity details + let activityDetailsList = Object.entries(activityDetails) + .map(([key, value]) => `
  • ${key}: "${value}"
  • `) + .join("\n"); + + // Create a list of device details + let deviceDetailsList = Object.entries(deviceDetails) + .map(([key, value]) => `
  • ${key}: "${value}"
  • `) + .join("\n"); + + const actionMessage = + activityType === "recall" + ? "A device has been recalled in your AirQo system." + : "A device has been deployed in your AirQo system."; + + const content = ` + + + ${actionMessage} +
    + Here are the details: +
    + Activity Details: +
      ${activityDetailsList}
    + Device Details: +
      ${deviceDetailsList}
    +
    + If you have any questions or concerns regarding this action, please contact your organization's administrator. +
    + Access AirQo Analytics here: ${constants.LOGIN_PAGE} +

    + + `; + + const name = `${firstName} ${lastName}`; + return constants.EMAIL_BODY({ email, content, name }); + }, token_compromised: ({ firstName = "", lastName = "", diff --git a/src/auth-service/utils/mailer.js b/src/auth-service/utils/mailer.js index e77ba43238..dfcc2389ab 100644 --- a/src/auth-service/utils/mailer.js +++ b/src/auth-service/utils/mailer.js @@ -49,6 +49,65 @@ let attachments = [ }, ]; +const getSubscribedBccEmails = async () => { + let bccEmails = constants.HARDWARE_AND_DS_EMAILS + ? constants.HARDWARE_AND_DS_EMAILS.split(",") + : []; + let subscribedEmails = []; + + for (const bccEmail of bccEmails.map((email) => email.trim())) { + const checkResult = await SubscriptionModel( + "airqo" + ).checkNotificationStatus({ email: bccEmail, type: "email" }); + if (checkResult.success) subscribedEmails.push(bccEmail); + } + + return subscribedEmails.join(","); +}; + +const createMailOptions = ({ + email, + firstName, + lastName, + activityDetails, + deviceDetails, + bccEmails, + activityType, +} = {}) => { + const subject = + activityType === "recall" + ? "AirQo Analytics: Device Recall Notification" + : "AirQo Analytics: Device Deployment Notification"; + + return { + from: { + name: constants.EMAIL_NAME, + address: constants.EMAIL, + }, + to: email, + subject, + html: msgs.field_activity({ + firstName, + lastName, + email, + activityDetails, + deviceDetails, + activityType, + }), + bcc: bccEmails, + }; +}; +const handleMailResponse = (data) => { + if (isEmpty(data.rejected) && !isEmpty(data.accepted)) { + return { success: true, message: "Email successfully sent", data }; + } else { + throw new HttpError( + "Internal Server Error", + httpStatus.INTERNAL_SERVER_ERROR, + { message: "Email not sent", emailResults: data } + ); + } +}; const mailer = { candidate: async ( { firstName, lastName, email, tenant = "airqo" } = {}, @@ -1896,6 +1955,8 @@ const mailer = { firstName = "", lastName = "", siteActivityDetails = {}, + activityDetails = {}, + deviceDetails = {}, tenant = "airqo", } = {}, next @@ -1941,6 +2002,8 @@ const mailer = { lastName, siteActivityDetails, email, + activityDetails, + deviceDetails, })}`, bcc: subscribedBccEmails, attachments: attachments, @@ -1980,6 +2043,43 @@ const mailer = { return; } }, + + fieldActivity: async ({ + email = "", + firstName = "", + lastName = "", + activityDetails = {}, + deviceDetails = {}, + activityType = "recall", // New parameter to determine activity type + }) => { + try { + const checkResult = await SubscriptionModel( + "airqo" + ).checkNotificationStatus({ email, type: "email" }); + if (!checkResult.success) return checkResult; + + const bccEmails = await getSubscribedBccEmails(); + const mailOptions = createMailOptions({ + email, + firstName, + lastName, + activityDetails, + deviceDetails, + bccEmails, + activityType, + }); + + let response = await transporter.sendMail(mailOptions); + return handleMailResponse(response); + } catch (error) { + logger.error(`🐛🐛 Internal Server Error ${error.message}`); + throw new HttpError( + "Internal Server Error", + httpStatus.INTERNAL_SERVER_ERROR, + { message: error.message } + ); + } + }, compromisedToken: async ( { email = "", From c988e085820416b2e4487ba27d68e61b87cedacc Mon Sep 17 00:00:00 2001 From: baalmart Date: Fri, 4 Oct 2024 18:11:13 +0300 Subject: [PATCH 2/3] Validate user_id before converting to ObjectId --- src/auth-service/bin/jobs/kafka-consumer.js | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/auth-service/bin/jobs/kafka-consumer.js b/src/auth-service/bin/jobs/kafka-consumer.js index 942446600c..bdbcf67d7c 100644 --- a/src/auth-service/bin/jobs/kafka-consumer.js +++ b/src/auth-service/bin/jobs/kafka-consumer.js @@ -220,7 +220,14 @@ const operationForBlacklistedIPs = async (messageData) => { }; const emailsForDeployedDevices = async (messageData) => { - const { createdActivity, updatedDevice, user_id } = JSON.parse(messageData); + let parsedData; + try { + parsedData = JSON.parse(messageData); + } catch (error) { + logger.error("Invalid JSON format in messageData."); + return; + } + const { createdActivity, updatedDevice, user_id } = parsedData; // Validate input data if (!createdActivity || !updatedDevice || !user_id) { @@ -228,6 +235,11 @@ const emailsForDeployedDevices = async (messageData) => { return; } + if (!ObjectId.isValid(user_id)) { + logger.error(`Invalid user_id format: ${user_id}`); + return; + } + try { const user = await UserModel("airqo") .findOne({ _id: ObjectId(user_id) }, "email _id firstName lastName") @@ -274,6 +286,11 @@ const emailsForRecalledDevices = async (messageData) => { return; } + if (!ObjectId.isValid(user_id)) { + logger.error(`Invalid user_id format: ${user_id}`); + return; + } + try { const user = await UserModel("airqo") .findOne({ _id: ObjectId(user_id) }, "email _id firstName lastName") From ee24e885e2381673540e4578fba6dde397d40a6f Mon Sep 17 00:00:00 2001 From: baalmart Date: Fri, 4 Oct 2024 18:12:15 +0300 Subject: [PATCH 3/3] Optimize Asynchronous Operations with Concurrent Processing --- src/auth-service/utils/mailer.js | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/auth-service/utils/mailer.js b/src/auth-service/utils/mailer.js index dfcc2389ab..8c9041efc2 100644 --- a/src/auth-service/utils/mailer.js +++ b/src/auth-service/utils/mailer.js @@ -55,13 +55,23 @@ const getSubscribedBccEmails = async () => { : []; let subscribedEmails = []; - for (const bccEmail of bccEmails.map((email) => email.trim())) { - const checkResult = await SubscriptionModel( - "airqo" - ).checkNotificationStatus({ email: bccEmail, type: "email" }); - if (checkResult.success) subscribedEmails.push(bccEmail); - } - + const checkPromises = bccEmails.map(async (bccEmail) => { + try { + const checkResult = await SubscriptionModel( + "airqo" + ).checkNotificationStatus({ email: bccEmail.trim(), type: "email" }); + return checkResult.success ? bccEmail.trim() : null; + } catch (error) { + logger.error( + `Error checking notification status for ${bccEmail}: ${error.message}` + ); + return null; + } + }); + const successfulEmails = (await Promise.all(checkPromises)).filter( + (email) => email !== null + ); + subscribedEmails = successfulEmails; return subscribedEmails.join(","); };