Skip to content

Commit

Permalink
Merge pull request #3554 from airqo-platform/kafka-field-activities
Browse files Browse the repository at this point in the history
enhancing the emails for field activities (deploy and recall) with more specific content
  • Loading branch information
Baalmart authored Oct 4, 2024
2 parents 5d6fe9a + ee24e88 commit 1668ee2
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 5 deletions.
33 changes: 30 additions & 3 deletions k8s/kafka/topics/kafka-topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ spec:
partitions: 2
replicas: 2
config:
retention.ms: 86400000
retention.ms: 86400000

---
apiVersion: kafka.strimzi.io/v1beta2
Expand All @@ -79,7 +79,7 @@ spec:
partitions: 2
replicas: 2
config:
retention.ms: 86400000
retention.ms: 86400000

---
apiVersion: kafka.strimzi.io/v1beta2
Expand Down Expand Up @@ -262,4 +262,31 @@ spec:
replicas: 2
config:
retention.ms: 18000000


---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: recall-topic
namespace: message-broker
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 2
replicas: 2
config:
retention.ms: 18000000

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: deploy-topic
namespace: message-broker
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 2
replicas: 2
config:
retention.ms: 18000000
123 changes: 123 additions & 0 deletions src/auth-service/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
AIRQO_WEBSITE=https://airqo.net/
AIRQO_YOUTUBE=
DB_NAME_STAGING=
FIREBASE_DATABASE_URL=
GOOGLE_APPLICATION_CREDENTIALS=
INSTANCE_ID=
JWT_SECRET=
KICKBOX_API_KEY=
LOCAL_DB=
MAILCHIMP_API_KEY=
MAILCHIMP_LIST_ID=
MAILCHIMP_MOBILE_API_KEY=
MAILCHIMP_SERVER_PREFIX=
MAIL_PASS=
MAIL_USER=
MONGO_DEV=
MONGO_GCE_HOST=
MONGO_GCE_PASSWORD=
MONGO_GCE_PORT=
MONGO_GCE_URI=
MONGO_GCE_USERNAME=
MONGO_STAGE=
NODE_PATH=
REQUEST_ACCESS_EMAILS=
HARDWARE_AND_DS_EMAILS=
PLATFORM_AND_DS_EMAILS=
PLATFORM_EMAILS=
COMMS_EMAILS=
POLICY_EMAILS=
CHAMPIONS_EMAILS=
ASSISTANCE_EMAILS=
RESEARCHERS_EMAILS=
DEVELOPERS_EMAILS=
PARTNERS_EMAILS=
PASSWORD_REGEX=
PLATFORM_DEV_BASE_URL=
PLATFORM_STAGING_BASE_URL=
SESSION_SECRET=
MONGO_STAGE_URI=
SUPPORT_EMAIL=
MONGO_PROD_URI=
MONGO_DEV_URI=
FIREBASE_COLLECTION_USERS=
FIREBASE_COLLECTION_KYA=
FIREBASE_COLLECTION_ANALYTICS=
FIREBASE_COLLECTION_NOTIFICATIONS=
FIREBASE_COLLECTION_FAVORITE_PLACES=
PRODUCTS_DEV_EMAIL=
DEFAULT_LIMIT=
SLACK_TOKEN=
SLACK_CHANNEL=
SLACK_USERNAME=
PROD_DEFAULT_NETWORK=
STAGE_DEFAULT_NETWORK=
DEV_DEFAULT_NETWORK=
PROD_DEFAULT_NETWORK_ROLE=
STAGE_DEFAULT_NETWORK_ROLE=
DEV_DEFAULT_NETWORK_ROLE=
MOBILE_APP_USERS_TOPIC=
DEPLOY_TOPIC=
RECALL_TOPIC=
DEFAULT_TENANT=
UNIQUE_CONSUMER_GROUP=
UNIQUE_PRODUCER_GROUP=
NEW_MOBILE_APP_USER_TOPIC=
KAFKA_BOOTSTRAP_SERVERS_DEV=
KAFKA_CLIENT_ID_DEV=
KAFKA_CLIENT_GROUP_DEV=
KAFKA_BOOTSTRAP_SERVERS_STAGE=
KAFKA_CLIENT_ID_STAGE=
KAFKA_CLIENT_GROUP_STAGE=
KAFKA_BOOTSTRAP_SERVERS_PROD=
KAFKA_CLIENT_ID_PROD=
KAFKA_CLIENT_GROUP_PROD=
GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET=
SUPER_ADMIN_PERMISSIONS=
TENANTS=
NETWORKS=
GMAIL_VERIFICATION_FAILURE_REDIRECT=
GMAIL_VERIFICATION_SUCCESS_REDIRECT=
FIREBASE_API_KEY=
FIREBASE_AUTH_DOMAIN=
FIREBASE_PROJECT_ID=
FIREBASE_TYPE=
FIREBASE_PRIVATE_KEY_ID=
FIREBASE_PRIVATE_KEY=
FIREBASE_CLIENT_EMAIL=
FIREBASE_CLIENT_ID=
FIREBASE_AUTH_URI=
FIREBASE_TOKEN_URI=
FIREBASE_AUTH_PROVIDER_X509_CERT_URL=
FIREBASE_CLIENT_X509_CERT_URL=
FIREBASE_UNIVERSE_DOMAIN=
FIREBASE_AUTHORIZATION_URL=
AIRQO_ANALYTICS_GMAIL=
AIRQO_ANALYTICS_GMAIL_PASS=
MOBILE_APP_PACKAGE_NAME=
MOBILE_APP_DYNAMIC_LINK_DOMAIN=
DEV_REDIS_SERVER=
DEV_REDIS_PORT=
PROD_REDIS_SERVER=
PROD_REDIS_PORT=
STAGE_REDIS_SERVER=
STAGE_REDIS_PORT=
STAGE_DEFAULT_GROUP=
DEV_DEFAULT_GROUP=
PROD_DEFAULT_GROUP=
ANALYTICS_PRODUCTION_BASE_URL=
ANALYTICS_STAGING_BASE_URL=
ANALYTICS_DEV_BASE_URL=
STAGE_DEFAULT_GROUP_ROLE=
PROD_DEFAULT_GROUP_ROLE=
DEV_DEFAULT_GROUP_ROLE=
STAGE_DEFAULT_AIRQLOUD=
PROD_DEFAULT_AIRQLOUD=
DEV_DEFAULT_AIRQLOUD=
STAGE_DEFAULT_GRID=
PROD_DEFAULT_GRID=
DEV_DEFAULT_GRID=
STAGE_DEFAULT_COHORT=
PROD_DEFAULT_COHORT=
DEV_DEFAULT_COHORT=
146 changes: 144 additions & 2 deletions src/auth-service/bin/jobs/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,56 @@ const Joi = require("joi");
const { jsonrepair } = require("jsonrepair");
const BlacklistedIPRangeModel = require("@models/BlacklistedIPRange");
const BlacklistedIPModel = require("@models/BlacklistedIP");
const UserModel = require("@models/User");
const stringify = require("@utils/stringify");
const rangeCheck = require("ip-range-check");
const asyncRetry = require("async-retry");
const mongoose = require("mongoose");
const isEmpty = require("is-empty");
const ObjectId = mongoose.Types.ObjectId;

const userSchema = Joi.object({
email: Joi.string().email().empty("").required(),
}).unknown(true);

const extractDeviceDetails = (updatedDevice) => {
const {
_id,
status,
category,
isActive,
long_name,
device_number,
name,
deployment_date,
latitude,
longitude,
mountType,
powerType,
} = updatedDevice;

return {
status,
category,
_id,
isActive,
long_name,
device_number,
name,
deployment_date,
latitude,
longitude,
mountType,
powerType,
};
};

const extractActivityDetails = (createdActivity) => {
return {
...createdActivity, // Spread operator to include all properties directly
};
};

const operationForNewMobileAppUser = async (messageData) => {
try {
logger.info(
Expand Down Expand Up @@ -177,7 +219,106 @@ const operationForBlacklistedIPs = async (messageData) => {
}
};

const operationFunction2 = async (messageData) => {};
const emailsForDeployedDevices = async (messageData) => {
let parsedData;
try {
parsedData = JSON.parse(messageData);
} catch (error) {
logger.error("Invalid JSON format in messageData.");
return;
}
const { createdActivity, updatedDevice, user_id } = parsedData;

// Validate input data
if (!createdActivity || !updatedDevice || !user_id) {
logger.error("Invalid input data: Missing required fields.");
return;
}

if (!ObjectId.isValid(user_id)) {
logger.error(`Invalid user_id format: ${user_id}`);
return;
}

try {
const user = await UserModel("airqo")
.findOne({ _id: ObjectId(user_id) }, "email _id firstName lastName")
.lean();

// Check if user exists
if (!user) {
logger.error(`User not found for user_id: ${user_id}`);
return;
}

const emailResponse = await mailer.fieldActivity({
email: user.email,
firstName: user.firstName,
lastName: user.lastName,
deviceDetails: extractDeviceDetails(updatedDevice),
activityDetails: extractActivityDetails(createdActivity),
activityType: "deploy",
});

// Handle email response
if (emailResponse && emailResponse.success === false) {
logger.error(`🐛 Internal Server Error -- ${stringify(emailResponse)}`);
}
} catch (error) {
logger.error(`🐛 Internal Server Error -- ${error.message}`);
}
};

const emailsForRecalledDevices = async (messageData) => {
// Parse the message and validate input data
let parsedData;
try {
parsedData = JSON.parse(messageData);
} catch (error) {
logger.error("Invalid JSON format in messageData.");
return;
}

const { createdActivity, updatedDevice, user_id } = parsedData;

if (!createdActivity || !updatedDevice || !user_id) {
logger.error("Invalid input data: Missing required fields.");
return;
}

if (!ObjectId.isValid(user_id)) {
logger.error(`Invalid user_id format: ${user_id}`);
return;
}

try {
const user = await UserModel("airqo")
.findOne({ _id: ObjectId(user_id) }, "email _id firstName lastName")
.lean();

// Check if user exists
if (!user) {
logger.error(`User not found for user_id: ${user_id}`);
return;
}

const emailResponse = await mailer.fieldActivity({
email: user.email,
firstName: user.firstName,
lastName: user.lastName,
deviceDetails: extractDeviceDetails(updatedDevice),
activityDetails: extractActivityDetails(createdActivity),
activityType: "recall",
});

// Handle email response
if (emailResponse && emailResponse.success === false) {
logger.error(`🐛 Internal Server Error -- ${stringify(emailResponse)}`);
}
} catch (error) {
logger.error(`🐛 Internal Server Error -- ${error.message}`);
}
};

const kafkaConsumer = async () => {
try {
Expand All @@ -196,7 +337,8 @@ const kafkaConsumer = async () => {
const topicOperations = {
// ["new-mobile-app-user-topic"]: operationForNewMobileAppUser,
["ip-address"]: operationForBlacklistedIPs,
// topic2: operationFunction2,
["deploy-topic"]: emailsForDeployedDevices,
["recall-topic"]: emailsForRecalledDevices,
};
await consumer.connect();
// Subscribe to all topics in the mapping
Expand Down
2 changes: 2 additions & 0 deletions src/auth-service/config/global/envs.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const envs = {
GOOGLE_CLIENT_ID: process.env.GOOGLE_CLIENT_ID,
GOOGLE_CLIENT_SECRET: process.env.GOOGLE_CLIENT_SECRET,
MOBILE_APP_USERS_TOPIC: process.env.MOBILE_APP_USERS_TOPIC,
DEPLOY_TOPIC: process.env.DEPLOY_TOPIC,
RECALL_TOPIC: process.env.RECALL_TOPIC,
UNIQUE_CONSUMER_GROUP: process.env.UNIQUE_CONSUMER_GROUP,
UNIQUE_PRODUCER_GROUP: process.env.UNIQUE_PRODUCER_GROUP,
NEW_MOBILE_APP_USER_TOPIC: process.env.NEW_MOBILE_APP_USER_TOPIC,
Expand Down
Loading

0 comments on commit 1668ee2

Please sign in to comment.