diff --git a/libs/gql-schema/schema.ts b/libs/gql-schema/schema.ts index a1526a757..d18c4867a 100644 --- a/libs/gql-schema/schema.ts +++ b/libs/gql-schema/schema.ts @@ -210,6 +210,8 @@ const rootSchema = ` input CampaignExportInput { campaignId: String! + # For Spoke Exports + campaignTitle: String exportType: CampaignExportType! spokeOptions: ExportForSpokeInput vanOptions: ExportForVanInput diff --git a/src/containers/AdminCampaignStats/components/CampaignExportModal.tsx b/src/containers/AdminCampaignStats/components/CampaignExportModal.tsx index 1b3cf4457..48f09df17 100644 --- a/src/containers/AdminCampaignStats/components/CampaignExportModal.tsx +++ b/src/containers/AdminCampaignStats/components/CampaignExportModal.tsx @@ -14,13 +14,21 @@ import React, { useState } from "react"; interface CampaignExportModalProps { open: boolean; campaignId: string; + campaignTitle: string; onError(errorMessage: string): void; onClose(): void; onComplete(): void; } const CampaignExportModal: React.FC = (props) => { - const { campaignId, open, onClose, onComplete, onError } = props; + const { + campaignId, + campaignTitle, + open, + onClose, + onComplete, + onError + } = props; const [exportCampaign, setExportCampaign] = useState(true); const [exportMessages, setExportMessages] = useState(true); const [exportOptOut, setExportOptOut] = useState(false); @@ -39,6 +47,7 @@ const CampaignExportModal: React.FC = (props) => { variables: { options: { campaignId, + campaignTitle, exportType: CampaignExportType.Spoke, spokeOptions: { campaign: exportCampaign, diff --git a/src/containers/AdminCampaignStats/index.jsx b/src/containers/AdminCampaignStats/index.jsx index c1df0fd8e..01a76b33d 100644 --- a/src/containers/AdminCampaignStats/index.jsx +++ b/src/containers/AdminCampaignStats/index.jsx @@ -439,6 +439,7 @@ class AdminCampaignStats extends React.Component { /> () => ({ - mutation: gql` - mutation exportCampaign($campaignId: String!) { - exportCampaign( - options: { campaignId: $campaignId, exportType: SPOKE } - ) { - id - } - } - `, - variables: { campaignId: ownProps.match.params.campaignId } - }), copyCampaign: (ownProps) => () => ({ mutation: gql` mutation copyCampaign($campaignId: String!) { diff --git a/src/schema.graphql b/src/schema.graphql index d13001ff5..15cf5bdc1 100644 --- a/src/schema.graphql +++ b/src/schema.graphql @@ -176,6 +176,8 @@ input ExportForSpokeInput { input CampaignExportInput { campaignId: String! + # For Spoke Exports + campaignTitle: String exportType: CampaignExportType! spokeOptions: ExportForSpokeInput vanOptions: ExportForVanInput diff --git a/src/server/api/root-mutations.ts b/src/server/api/root-mutations.ts index bf6fb28fe..230e6d080 100644 --- a/src/server/api/root-mutations.ts +++ b/src/server/api/root-mutations.ts @@ -30,7 +30,7 @@ import MemoizeHelper, { cacheOpts } from "../memoredis"; import { cacheableData, r } from "../models"; import { getUserById } from "../models/cacheable_queries"; import { Notifications, sendUserNotification } from "../notifications"; -import { addExportCampaign } from "../tasks/export-campaign"; +import { addExportCampaign } from "../tasks/chunk-tasks/export-campaign"; import { addExportForVan } from "../tasks/export-for-van"; import { TASK_IDENTIFIER as exportOptOutsIdentifier } from "../tasks/export-opt-outs"; import { addFilterLandlines } from "../tasks/filter-landlines"; @@ -296,7 +296,13 @@ const rootMutations = { }, exportCampaign: async (_root, { options }, { user, loaders }) => { - const { campaignId, exportType, vanOptions, spokeOptions } = options; + const { + campaignId, + campaignTitle, + exportType, + vanOptions, + spokeOptions + } = options; if (exportType === CampaignExportType.VAN && !vanOptions) { throw new Error("Input must include vanOptions when exporting as VAN!"); @@ -313,6 +319,7 @@ const rootMutations = { if (exportType === CampaignExportType.SPOKE) { return addExportCampaign({ campaignId, + campaignTitle, requesterId: user.id, spokeOptions }); diff --git a/src/server/tasks/chunk-tasks/export-campaign/index.ts b/src/server/tasks/chunk-tasks/export-campaign/index.ts new file mode 100644 index 000000000..e32efb687 --- /dev/null +++ b/src/server/tasks/chunk-tasks/export-campaign/index.ts @@ -0,0 +1,647 @@ +/* eslint-disable no-cond-assign */ +import { format } from "fast-csv"; +import _ from "lodash"; + +import { DateTime } from "../../../../lib/datetime"; +import { + getDownloadUrl, + getUploadStream +} from "../../../../workers/exports/upload"; +import getExportCampaignContent from "../../../api/export-campaign"; +import { sendEmail } from "../../../mail"; +import { r } from "../../../models"; +import type { + ChunkTaskPayload, + ContactExportRow, + ExportCampaignTask, + ExportChunk, + FilteredContactsRow, + InteractionStepRecord, + MessageExportRow, + ProcessChunkTitlePayload, + ProcessExportChunksPayload, + ProcessMessagesChunkPayload, + ProgressTask, + ProgressTaskHelpers, + UploadCampaignContacts, + UploadCampaignMessages, + UploadContactsPayload +} from "./utils"; +import { + addProgressJob, + CHUNK_SIZE, + errToObj, + getChunkedContactsCte, + getContactCount, + getNotificationEmail, + isFilteredContact, + TASK_IDENTIFIER +} from "./utils"; + +export { TASK_IDENTIFIER }; + +const stepHasQuestion = (step: InteractionStepRecord) => + step.question && step.question.trim() !== ""; + +/** + * Returns map from interaction step ID --> question text (deduped where appropriate). + * @param {object[]} interactionSteps Array of interaction steps to work on. + */ +export const getUniqueQuestionsByStepId = ( + interactionSteps: InteractionStepRecord[] +) => { + const questionSteps = interactionSteps.filter(stepHasQuestion); + const duplicateQuestions = _.groupBy(questionSteps, (step) => step.question); + + return Object.keys(duplicateQuestions).reduce( + (allQuestions, questionText) => { + const steps = duplicateQuestions[questionText]; + const newUniqueQuestions = + steps.length > 1 + ? steps.reduce( + (collector, step, index) => + Object.assign(collector, { + [step.id]: `${questionText}_${index + 1}` + }), + {} + ) + : { [steps[0].id]: questionText }; + + return Object.assign(allQuestions, newUniqueQuestions); + }, + {} + ); +}; + +export const fetchExportData = async ( + campaignId: number, + requesterId: number +) => { + const contactsCount = await getContactCount(campaignId); + const notificationEmail = await getNotificationEmail(requesterId); + + const interactionSteps = await r + .reader("interaction_step") + .select("*") + .where({ campaign_id: campaignId }); + + const assignments = await r + .reader("assignment") + .where("campaign_id", campaignId) + .join("user", "user_id", "user.id") + .select( + "assignment.id as id", + "user.first_name", + "user.last_name", + "user.email", + "user.cell", + "user.assigned_cell" + ); + + const campaignVariableNames: string[] = await r + .reader<{ name: string }>("campaign_variable") + .where("campaign_id", campaignId) + .distinct("name") + .then((rows) => rows.map(({ name }) => name)); + + return { + contactsCount, + notificationEmail, + interactionSteps, + campaignVariableNames, + assignments + }; +}; + +const createContactRow = ( + contact: FilteredContactsRow | ContactExportRow, + campaignId: number, + campaignTitle: string +) => { + const contactIsFiltered = isFilteredContact(contact); + + const contactRow: Record = { + campaignId, + campaign: campaignTitle, + "contact[firstName]": contact.first_name, + "contact[lastName]": contact.last_name, + "contact[cell]": contact.cell, + "contact[zip]": contact.zip, + "contact[city]": contact.city || null, + "contact[state]": contact.state || null, + "contact[messageStatus]": contactIsFiltered + ? "removed" + : contact.message_status, + "contact[external_id]": contact.external_id + }; + + if (contactIsFiltered) + contactRow["contact[filtered_reason]"] = contact.filtered_reason; + + return contactRow; +}; + +const appendCustomFields = ( + contact: FilteredContactsRow | ContactExportRow, + contactRow: { [key: string]: any } +) => { + const customFields = JSON.parse(contact.custom_fields); + Object.keys(customFields).forEach((fieldName) => { + contactRow[`contact[${fieldName}]`] = customFields[fieldName]; + }); +}; + +const processFilteredContactsChunk = async ({ + campaignId, + campaignTitle, + lastContactId +}: ProcessChunkTitlePayload): Promise => { + const filteredRows: FilteredContactsRow[] = await r + .reader("filtered_contact") + .select("filtered_contact.*", "zip_code.city", "zip_code.state") + .leftJoin("zip_code", "filtered_contact.zip", "zip_code.zip") + .where({ campaign_id: campaignId }) + .whereRaw("filtered_contact.id > ?", [lastContactId]) + .orderBy("filtered_contact.id", "asc") + .limit(CHUNK_SIZE); + + const newLastContactId = filteredRows?.at(-1)?.id ?? 0; + + if (newLastContactId === 0) return false; + + const contacts = filteredRows.map((contact) => { + const contactRow = createContactRow(contact, campaignId, campaignTitle); + appendCustomFields(contact, contactRow); + return contactRow; + }); + + return { lastContactId: newLastContactId, data: contacts }; +}; + +export const processContactsChunk = async ( + { campaignId, campaignTitle, lastContactId }: ProcessChunkTitlePayload, + questionsById: { [key: string]: string }, + onlyOptOuts = false +): Promise => { + const contactsCte = onlyOptOuts + ? getChunkedContactsCte("is_opted_out = true") + : getChunkedContactsCte(); + + const { rows }: { rows: ContactExportRow[] } = await r.reader.raw( + ` + ${contactsCte} + select + campaign_contacts.*, + zip_code.city, + zip_code.state, + question_response.interaction_step_id, + question_response.value, + tags.tag_titles + from campaign_contacts + left join question_response + on question_response.campaign_contact_id = campaign_contacts.id + left join zip_code + on zip_code.zip = campaign_contacts.zip + left join ( + select + campaign_contact_tag.campaign_contact_id, + array_agg(tag.title) as tag_titles + from campaign_contact_tag + join tag + on campaign_contact_tag.tag_id = tag.id + group by campaign_contact_tag.campaign_contact_id + ) as tags + on tags.campaign_contact_id = campaign_contacts.id + order by campaign_contacts.id asc + ; + `, + [campaignId, lastContactId, CHUNK_SIZE] + ); + + if (rows.length === 0) return false; + + lastContactId = rows[rows.length - 1].id; + + const rowsByContactId = _.groupBy(rows, (row) => row.id); + const contacts = Object.keys(rowsByContactId).map((contactId) => { + // Use the first row for all the common campaign contact fields + const contact = rowsByContactId[contactId][0]; + const contactRow = createContactRow(contact, campaignId, campaignTitle); + appendCustomFields(contact, contactRow); + + // Append columns for question responses + Object.keys(questionsById).forEach((stepId) => { + const questionText = questionsById[stepId]; + const response = rowsByContactId[contactId].find( + (qr) => + parseInt(`${qr.interaction_step_id}`, 10) === parseInt(stepId, 10) + ); + + const responseValue = response ? response.value : ""; + contactRow[`question[${questionText}]`] = responseValue; + }); + + contactRow.tags = contact.tag_titles; + + return contactRow; + }); + + return { lastContactId, data: contacts }; +}; + +export const processMessagesChunk = async ({ + campaignId, + lastContactId, + campaignVariableNames +}: ProcessMessagesChunkPayload): Promise => { + const { rows }: { rows: MessageExportRow[] } = await r.reader.raw( + ` + select + message.campaign_contact_id, + message.assignment_id, + message.user_number, + message.contact_number, + message.is_from_contact, + message.text, + message.send_status, + message.created_at, + array_to_string(message.error_codes, '|') as error_codes, + message.num_segments, + message.num_media, + public.user.first_name, + public.user.last_name, + public.user.email, + public.user.cell as user_cell, + ( + select json_object(array_agg(name), array_agg(value)) + from campaign_variable + where id = ANY(message.campaign_variable_ids) + ) as campaign_variables + from message + left join public.user + on message.user_id = public.user.id + where campaign_contact_id in ( + select id + from campaign_contact + where + campaign_id = ? + and id > ? + and exists ( + select 1 + from message + where campaign_contact_id = campaign_contact.id + ) + order by + id asc + limit ? + ) + order by + campaign_contact_id asc, + message.created_at asc + ; + `, + [campaignId, lastContactId, CHUNK_SIZE] + ); + + if (rows.length === 0) return false; + + lastContactId = rows[rows.length - 1].campaign_contact_id; + + const campaignVariableColumns = (message: MessageExportRow) => + campaignVariableNames.reduce>( + (acc, variableName) => ({ + ...acc, + [`campaignVariable[${variableName}]`]: message.campaign_variables + ? message.campaign_variables[variableName] ?? null + : null + }), + {} + ); + + const messages = rows.map((message) => ({ + assignmentId: message.assignment_id, + userNumber: message.user_number, + contactNumber: message.contact_number, + isFromContact: message.is_from_contact, + numSegments: message.num_segments, + numMedia: message.num_media, + sendStatus: message.send_status, + errorCodes: message.error_codes, + attemptedAt: DateTime.fromJSDate(new Date(message.created_at)).toISO(), + text: message.text, + campaignId, + "texter[firstName]": message.first_name, + "texter[lastName]": message.last_name, + "texter[email]": message.email, + "texter[cell]": message.user_cell, + ...campaignVariableColumns(message) + })); + + return { lastContactId, data: messages }; +}; + +const setupUploadStreams = async ( + fileName: string, + helpers: ProgressTaskHelpers +) => { + const uploadStream = await getUploadStream(`${fileName}.csv`); + const writeStream = format({ + headers: true, + writeHeaders: true + }); + + uploadStream.on("error", (err) => { + helpers.logger.error(`error in ${fileName}UploadStream: `, errToObj(err)); + }); + + writeStream.on("error", (err) => { + helpers.logger.error(`error in ${fileName}WriteStream: `, errToObj(err)); + }); + + const uploadPromise = new Promise((resolve) => { + uploadStream.on("finish", resolve); + }); + + writeStream.pipe(uploadStream); + + return { writeStream, uploadPromise }; +}; + +const processChunks = async (payload: ProcessExportChunksPayload) => { + const { + processChunk, + campaignId, + campaignTitle = "", + uniqueQuestionsByStepId, + helpers, + contactsCount, + statusOffset = 0, + writeStream + } = payload; + + let lastContactId = 0; + let processed = 0; + let chunkResult; + + while ( + (chunkResult = await processChunk({ + campaignId, + campaignTitle, + lastContactId, + uniqueQuestionsByStepId + })) + ) { + lastContactId = chunkResult.lastContactId; + helpers.logger.debug( + `Processing export for ${campaignId} chunk part ${lastContactId}` + ); + + processed += CHUNK_SIZE; + await helpers.updateStatus( + Math.round((processed / contactsCount / 4) * 100) + statusOffset + ); + + for (const c of chunkResult.data) writeStream.write(c); + } +}; + +// eslint-disable-next-line max-len +const processAndUploadCampaignContacts: ExportCampaignTask = async ( + payload, + helpers +) => { + const { + interactionSteps, + fileNameKey, + onlyOptOuts, + campaignId, + contactsCount, + campaignTitle + } = payload; + const uniqueQuestionsByStepId = getUniqueQuestionsByStepId(interactionSteps); + + const campaignContactsKey = onlyOptOuts + ? `${fileNameKey}-optouts` + : fileNameKey; + + const { writeStream, uploadPromise } = await setupUploadStreams( + campaignContactsKey, + helpers + ); + + try { + await processChunks({ + processChunk: (params) => + processContactsChunk(params, uniqueQuestionsByStepId, onlyOptOuts), + campaignId, + campaignTitle, + uniqueQuestionsByStepId, + helpers, + contactsCount, + writeStream, + statusOffset: onlyOptOuts ? 75 : 25 + }); + } finally { + writeStream.end(); + } + + await uploadPromise; + return getDownloadUrl(`${campaignContactsKey}.csv`); +}; + +// eslint-disable-next-line max-len +const processAndUploadCampaignMessages: ExportCampaignTask = async ( + { fileNameKey, campaignId, contactsCount, campaignVariableNames }, + helpers +) => { + const messagesKey = `${fileNameKey}-messages`; + const { writeStream, uploadPromise } = await setupUploadStreams( + messagesKey, + helpers + ); + + try { + await processChunks({ + processChunk: (params) => + processMessagesChunk({ + campaignId, + lastContactId: params.lastContactId, + campaignVariableNames + }), + campaignId, + helpers, + contactsCount, + writeStream + }); + } finally { + writeStream.end(); + } + + await uploadPromise; + return getDownloadUrl(`${messagesKey}.csv`); +}; + +// eslint-disable-next-line max-len +const processAndUploadFilteredContacts: ExportCampaignTask = async ( + { fileNameKey, campaignId, campaignTitle }, + helpers +): Promise => { + const filteredContactsKey = `${fileNameKey}-filteredContacts`; + const { writeStream, uploadPromise } = await setupUploadStreams( + filteredContactsKey, + helpers + ); + + const countQuery = await r + .reader("filtered_contact") + .count("*") + .where({ campaign_id: campaignId }); + const contactsCount = countQuery[0].count as number; + + try { + await processChunks({ + processChunk: (params) => + processFilteredContactsChunk({ + campaignId, + campaignTitle, + lastContactId: params.lastContactId + }), + campaignId, + campaignTitle, + helpers, + contactsCount, + writeStream, + statusOffset: 75 + }); + } finally { + writeStream.end(); + } + + await uploadPromise; + return getDownloadUrl(`${filteredContactsKey}.csv`); +}; + +export interface ExportCampaignPayload extends ChunkTaskPayload { + isAutomatedExport?: boolean; + spokeOptions: { + campaign: boolean; + messages: boolean; + optOuts: boolean; + filteredContacts: boolean; + }; +} + +export const exportCampaign: ProgressTask = async ( + payload, + helpers +) => { + const { + campaignId, + campaignTitle, + requesterId, + isAutomatedExport = false, + spokeOptions + } = payload; + + const { + contactsCount, + notificationEmail, + interactionSteps, + campaignVariableNames + } = await fetchExportData(campaignId, requesterId); + + // Attempt upload to cloud storage + let fileNameKey = campaignTitle.replace(/ /g, "_").replace(/\//g, "_"); + + if (!isAutomatedExport) { + const timestamp = DateTime.local().toFormat("y-mm-d-hh-mm-ss"); + fileNameKey = `${fileNameKey}-${timestamp}`; + } + + const { + campaign: shouldExportCampaign, + filteredContacts: shouldExportFilteredContacts, + messages: shouldExportMessages, + optOuts: shouldExportOptOuts + } = spokeOptions; + + const campaign = { campaignId, campaignTitle, contactsCount }; + + const campaignExportUrl = shouldExportCampaign + ? await processAndUploadCampaignContacts( + { + fileNameKey, + ...campaign, + interactionSteps, + onlyOptOuts: false + }, + helpers + ) + : null; + + const campaignOptOutsExportUrl = shouldExportOptOuts + ? await processAndUploadCampaignContacts( + { + fileNameKey, + ...campaign, + interactionSteps, + onlyOptOuts: true + }, + helpers + ) + : null; + + const campaignMessagesExportUrl = shouldExportMessages + ? await processAndUploadCampaignMessages( + { + fileNameKey, + ...campaign, + campaignVariableNames + }, + helpers + ) + : null; + + const campaignFilteredContactsExportUrl = shouldExportFilteredContacts + ? await processAndUploadFilteredContacts( + { + fileNameKey, + ...campaign + }, + helpers + ) + : null; + + helpers.logger.debug("Waiting for streams to finish"); + + try { + if (!isAutomatedExport) { + const exportContent = await getExportCampaignContent( + { + campaignExportUrl, + campaignFilteredContactsExportUrl, + campaignOptOutsExportUrl, + campaignMessagesExportUrl + }, + campaignTitle + ); + await sendEmail({ + to: notificationEmail, + subject: `Export ready for ${campaignTitle}`, + html: exportContent + }); + } + helpers.logger.info(`Successfully exported ${campaignId}`); + } finally { + helpers.logger.info("Finishing export process"); + } +}; + +export const addExportCampaign = async (payload: ExportCampaignPayload) => + addProgressJob({ + identifier: TASK_IDENTIFIER, + payload, + taskSpec: { + queueName: "export" + } + }); diff --git a/src/server/tasks/chunk-tasks/export-campaign/utils.ts b/src/server/tasks/chunk-tasks/export-campaign/utils.ts new file mode 100644 index 000000000..92f511ce2 --- /dev/null +++ b/src/server/tasks/chunk-tasks/export-campaign/utils.ts @@ -0,0 +1,135 @@ +import type { CsvFormatterStream, FormatterRowArray } from "fast-csv"; + +import { config } from "../../../../config"; +import type { + CampaignContactRecord, + FilteredContactRecord, + InteractionStepRecord, + MessageRecord, + UserRecord +} from "../../../api/types"; +import type { KnownReturnProgressTask, ProgressTaskHelpers } from "../../utils"; +import type { + CampaignTitlePayload, + ChunkTaskPayload, + ContactTaskChunk, + ProcessChunkPayload +} from "../utils"; + +export { errToObj } from "../../../utils"; +export type { ProgressTask, ProgressTaskHelpers } from "../../utils"; +export { addProgressJob } from "../../utils"; +export type { ChunkTaskPayload, ProcessChunkTitlePayload } from "../utils"; +export { + getContactCount, + getChunkedContactsCte, + getNotificationEmail +} from "../utils"; +export type { InteractionStepRecord }; +export const TASK_IDENTIFIER = "export-campaign"; +export const CHUNK_SIZE = config.EXPORT_CAMPAIGN_CHUNK_SIZE; + +export interface ExportChunk extends ContactTaskChunk { + data: { [key: string]: any }[]; +} + +interface ContactCityState { + city: string; + state: string; +} + +export interface ContactExportRow + extends CampaignContactRecord, + ContactCityState { + interaction_step_id: number; + value: string; + tag_titles: string; +} + +export interface FilteredContactsRow + extends FilteredContactRecord, + ContactCityState {} + +export interface MessageExportRow + extends Pick< + MessageRecord, + | "campaign_contact_id" + | "assignment_id" + | "user_number" + | "contact_number" + | "is_from_contact" + | "text" + | "send_status" + | "created_at" + | "num_segments" + | "num_media" + >, + Pick { + error_codes: string; + user_cell: string; + campaign_variables: Record; +} + +interface CampaignVariablePayload { + campaignVariableNames: string[]; +} + +interface FilePayload { + fileNameKey: string; +} + +export type ExportCampaignTask

= KnownReturnProgressTask< + P, + string +>; + +interface UploadPayload + extends Omit, + FilePayload { + contactsCount: number; +} + +export interface UploadContactsPayload + extends UploadPayload, + CampaignTitlePayload {} + +export interface UploadCampaignContacts extends UploadContactsPayload { + campaignTitle: string; + interactionSteps: InteractionStepRecord[]; + onlyOptOuts: boolean; +} + +export interface ProcessMessagesChunkPayload + extends ProcessChunkPayload, + CampaignVariablePayload {} + +export interface UploadCampaignMessages + extends CampaignVariablePayload, + UploadPayload {} + +interface ProcessExportChunkPayload { + campaignId: number; + campaignTitle: string; + lastContactId: number; + uniqueQuestionsByStepId?: any; +} + +export interface ProcessExportChunksPayload { + processChunk: ( + payload: ProcessExportChunkPayload + ) => Promise; + campaignId: number; + campaignTitle?: string; + uniqueQuestionsByStepId?: any; + helpers: ProgressTaskHelpers; + contactsCount: number; + processedInitial?: number; + statusOffset?: number; + writeStream: CsvFormatterStream; +} + +export const isFilteredContact = ( + contact: FilteredContactsRow | ContactExportRow +): contact is FilteredContactsRow => { + return "filtered_reason" in contact; +}; diff --git a/src/server/tasks/chunk-tasks/index.ts b/src/server/tasks/chunk-tasks/index.ts new file mode 100644 index 000000000..ac1e5b82a --- /dev/null +++ b/src/server/tasks/chunk-tasks/index.ts @@ -0,0 +1,11 @@ +/* eslint-disable import/prefer-default-export */ +import type { TaskList } from "graphile-worker"; + +import { + exportCampaign, + TASK_IDENTIFIER as EXPORT_CAMPAIGN_IDENTIFIER +} from "./export-campaign"; + +export const taskList: TaskList = { + [EXPORT_CAMPAIGN_IDENTIFIER]: exportCampaign +}; diff --git a/src/server/tasks/chunk-tasks/utils.ts b/src/server/tasks/chunk-tasks/utils.ts new file mode 100644 index 000000000..0fc5169e9 --- /dev/null +++ b/src/server/tasks/chunk-tasks/utils.ts @@ -0,0 +1,64 @@ +import { r } from "../../models"; +import type { ProgressJobPayload } from "../utils"; + +export interface ContactTaskChunk { + lastContactId: number; +} + +export interface CampaignTitlePayload { + campaignTitle: string; +} + +export interface ProcessChunkPayload + extends ProgressJobPayload, + ContactTaskChunk {} + +export interface ProcessChunkTitlePayload + extends ProcessChunkPayload, + CampaignTitlePayload {} + +export interface ChunkTaskPayload + extends ProgressJobPayload, + CampaignTitlePayload { + requesterId: number; +} + +export const getContactCount = async (campaignId: number) => { + const [{ count }] = await r + .reader("campaign_contact") + .count() + .where({ campaign_id: campaignId }); + return count as number; +}; + +export const getNotificationEmail = async (requesterId: number) => { + const { email } = await r + .reader("user") + .first("email") + .where({ id: requesterId }); + return email; +}; + +/** + * + * @param filter a SQL condition to filter the contacts (ex. is_opted_out = true) + * @returns the condition prepended with "and" if it is defined and an empty string otherwise + */ +export const andSqlFilter = (filter?: string) => + filter ? `and ${filter}` : ""; + +/** + * Fetch a chunk of contacts for the campaign + * @param filter a SQL condition to filter the contacts (ex. is_opted_out = true) + * @returns CTE for the selected contacts which requires query params [campaignId, lastContactId, limit] + */ +export const getChunkedContactsCte = (filter?: string) => { + return ` + with campaign_contacts as ( + select * from campaign_contact + where campaign_id = ? and id > ? + ${andSqlFilter(filter)} + order by campaign_contact.id asc + limit ? + )`; +}; diff --git a/src/server/tasks/export-campaign.ts b/src/server/tasks/export-campaign.ts deleted file mode 100644 index 90a915116..000000000 --- a/src/server/tasks/export-campaign.ts +++ /dev/null @@ -1,755 +0,0 @@ -/* eslint-disable no-cond-assign */ -import { format } from "fast-csv"; -import _ from "lodash"; - -import { config } from "../../config"; -import { DateTime } from "../../lib/datetime"; -import { getDownloadUrl, getUploadStream } from "../../workers/exports/upload"; -import getExportCampaignContent from "../api/export-campaign"; -import type { - CampaignContactRecord, - FilteredContactRecord, - InteractionStepRecord, - MessageRecord, - UserRecord -} from "../api/types"; -import { sendEmail } from "../mail"; -import { r } from "../models"; -import { errToObj } from "../utils"; -import type { ProgressTask, ProgressTaskHelpers } from "./utils"; -import { addProgressJob } from "./utils"; - -export const TASK_IDENTIFIER = "export-campaign"; - -const CHUNK_SIZE = config.EXPORT_CAMPAIGN_CHUNK_SIZE; - -interface ExportChunk { - lastContactId: number; -} - -interface ContactsChunk extends ExportChunk { - contacts: { [key: string]: string }[]; -} - -interface MessagesChunk extends ExportChunk { - messages: { [key: string]: any }[]; -} - -const stepHasQuestion = (step: InteractionStepRecord) => - step.question && step.question.trim() !== ""; - -/** - * Returns map from interaction step ID --> question text (deduped where appropriate). - * @param {object[]} interactionSteps Array of interaction steps to work on. - */ -export const getUniqueQuestionsByStepId = ( - interactionSteps: InteractionStepRecord[] -) => { - const questionSteps = interactionSteps.filter(stepHasQuestion); - const duplicateQuestions = _.groupBy(questionSteps, (step) => step.question); - - return Object.keys(duplicateQuestions).reduce( - (allQuestions, questionText) => { - const steps = duplicateQuestions[questionText]; - const newUniqueQuestions = - steps.length > 1 - ? steps.reduce( - (collector, step, index) => - Object.assign(collector, { - [step.id]: `${questionText}_${index + 1}` - }), - {} - ) - : { [steps[0].id]: questionText }; - - return Object.assign(allQuestions, newUniqueQuestions); - }, - {} - ); -}; - -/** - * Fetch necessary job data from database. - * @param {object} job The export job object to fetch data for. - * Must have payload, campaign_id, and requester properties. - */ -export const fetchExportData = async ( - campaignId: number, - requesterId: number -) => { - const { title: campaignTitle } = await r - .reader("campaign") - .first("title") - .where({ id: campaignId }); - - const { email: notificationEmail } = await r - .reader("user") - .first("email") - .where({ id: requesterId }); - - const interactionSteps = await r - .reader("interaction_step") - .select("*") - .where({ campaign_id: campaignId }); - - const assignments = await r - .reader("assignment") - .where("campaign_id", campaignId) - .join("user", "user_id", "user.id") - .select( - "assignment.id as id", - "user.first_name", - "user.last_name", - "user.email", - "user.cell", - "user.assigned_cell" - ); - - const campaignVariableNames: string[] = await r - .reader<{ name: string }>("campaign_variable") - .where("campaign_id", campaignId) - .distinct("name") - .then((rows) => rows.map(({ name }) => name)); - - return { - campaignTitle, - notificationEmail, - interactionSteps, - campaignVariableNames, - assignments - }; -}; - -interface FilteredContactsRow extends FilteredContactRecord { - city: string; - state: string; -} - -const processFilteredContactsChunk = async ( - campaignId: number, - campaignTitle: string, - lastContactId = 0 -): Promise => { - const filteredRows: FilteredContactsRow[] = await r - .reader("filtered_contact") - .select("filtered_contact.*", "zip_code.city", "zip_code.state") - .leftJoin("zip_code", "filtered_contact.zip", "zip_code.zip") - .where({ campaign_id: campaignId }) - .whereRaw("filtered_contact.id > ?", [lastContactId]) - .orderBy("filtered_contact.id", "asc") - .limit(CHUNK_SIZE); - - const newLastContactId = filteredRows?.at(-1)?.id ?? 0; - - if (newLastContactId === 0) return false; - - const contacts = filteredRows.map((contact) => { - const contactRow: { [key: string]: any } = { - "contact[filtered_reason]": contact.filtered_reason, - campaignId, - campaign: campaignTitle, - "contact[firstName]": contact.first_name, - "contact[lastName]": contact.last_name, - "contact[cell]": contact.cell, - "contact[zip]": contact.zip, - "contact[city]": contact.city || null, - "contact[state]": contact.state || null, - "contact[messageStatus]": "removed", - "contact[external_id]": contact.external_id - }; - - // Append columns for custom fields - const customFields = JSON.parse(contact.custom_fields); - Object.keys(customFields).forEach((fieldName) => { - contactRow[`contact[${fieldName}]`] = customFields[fieldName]; - }); - - return contactRow; - }); - - return { lastContactId: newLastContactId, contacts }; -}; - -interface ContactExportRow extends CampaignContactRecord { - city: string; - state: string; - interaction_step_id: number; - value: string; - tag_titles: string; -} - -export const processContactsChunk = async ( - campaignId: number, - campaignTitle: string, - questionsById: { [key: string]: string }, - lastContactId = 0, - onlyOptOuts = false -): Promise => { - const { rows }: { rows: ContactExportRow[] } = await r.reader.raw( - ` - with campaign_contacts as ( - select * - from campaign_contact - where - campaign_id = ? - and id > ? - ${onlyOptOuts ? "and is_opted_out = true" : ""} - order by - campaign_contact.id asc - limit ? - ) - select - campaign_contacts.*, - zip_code.city, - zip_code.state, - question_response.interaction_step_id, - question_response.value, - tags.tag_titles - from campaign_contacts - left join question_response - on question_response.campaign_contact_id = campaign_contacts.id - left join zip_code - on zip_code.zip = campaign_contacts.zip - left join ( - select - campaign_contact_tag.campaign_contact_id, - array_agg(tag.title) as tag_titles - from campaign_contact_tag - join tag - on campaign_contact_tag.tag_id = tag.id - group by campaign_contact_tag.campaign_contact_id - ) as tags - on tags.campaign_contact_id = campaign_contacts.id - order by campaign_contacts.id asc - ; - `, - [campaignId, lastContactId, CHUNK_SIZE] - ); - - if (rows.length === 0) return false; - - lastContactId = rows[rows.length - 1].id; - - const rowsByContactId = _.groupBy(rows, (row) => row.id); - const contacts = Object.keys(rowsByContactId).map((contactId) => { - // Use the first row for all the common campaign contact fields - const contact = rowsByContactId[contactId][0]; - const contactRow: { [key: string]: any } = { - campaignId, - campaign: campaignTitle, - "contact[firstName]": contact.first_name, - "contact[lastName]": contact.last_name, - "contact[cell]": contact.cell, - "contact[zip]": contact.zip, - "contact[city]": contact.city || null, - "contact[state]": contact.state || null, - "contact[optOut]": contact.is_opted_out, - "contact[messageStatus]": contact.message_status, - "contact[external_id]": contact.external_id - }; - - // Append columns for custom fields - const customFields = JSON.parse(contact.custom_fields); - Object.keys(customFields).forEach((fieldName) => { - contactRow[`contact[${fieldName}]`] = customFields[fieldName]; - }); - - // Append columns for question responses - Object.keys(questionsById).forEach((stepId) => { - const questionText = questionsById[stepId]; - const response = rowsByContactId[contactId].find( - (qr) => - parseInt(`${qr.interaction_step_id}`, 10) === parseInt(stepId, 10) - ); - - const responseValue = response ? response.value : ""; - contactRow[`question[${questionText}]`] = responseValue; - }); - - contactRow.tags = contact.tag_titles; - - return contactRow; - }); - - return { lastContactId, contacts }; -}; - -interface MessageExportRow - extends Pick< - MessageRecord, - | "campaign_contact_id" - | "assignment_id" - | "user_number" - | "contact_number" - | "is_from_contact" - | "text" - | "send_status" - | "created_at" - | "num_segments" - | "num_media" - >, - Pick { - error_codes: string; - user_cell: string; - campaign_variables: Record; -} - -export const processMessagesChunk = async ( - campaignId: number, - campaignVariableNames: string[], - lastContactId = 0 -): Promise => { - const { rows }: { rows: MessageExportRow[] } = await r.reader.raw( - ` - select - message.campaign_contact_id, - message.assignment_id, - message.user_number, - message.contact_number, - message.is_from_contact, - message.text, - message.send_status, - message.created_at, - array_to_string(message.error_codes, '|') as error_codes, - message.num_segments, - message.num_media, - public.user.first_name, - public.user.last_name, - public.user.email, - public.user.cell as user_cell, - ( - select json_object(array_agg(name), array_agg(value)) - from campaign_variable - where id = ANY(message.campaign_variable_ids) - ) as campaign_variables - from message - left join public.user - on message.user_id = public.user.id - where campaign_contact_id in ( - select id - from campaign_contact - where - campaign_id = ? - and id > ? - and exists ( - select 1 - from message - where campaign_contact_id = campaign_contact.id - ) - order by - id asc - limit ? - ) - order by - campaign_contact_id asc, - message.created_at asc - ; - `, - [campaignId, lastContactId, CHUNK_SIZE] - ); - - if (rows.length === 0) return false; - - lastContactId = rows[rows.length - 1].campaign_contact_id; - - const campaignVariableColumns = (message: MessageExportRow) => - campaignVariableNames.reduce>( - (acc, variableName) => ({ - ...acc, - [`campaignVariable[${variableName}]`]: message.campaign_variables - ? message.campaign_variables[variableName] ?? null - : null - }), - {} - ); - - const messages = rows.map((message) => ({ - assignmentId: message.assignment_id, - userNumber: message.user_number, - contactNumber: message.contact_number, - isFromContact: message.is_from_contact, - numSegments: message.num_segments, - numMedia: message.num_media, - sendStatus: message.send_status, - errorCodes: message.error_codes, - attemptedAt: DateTime.fromJSDate(new Date(message.created_at)).toISO(), - text: message.text, - campaignId, - "texter[firstName]": message.first_name, - "texter[lastName]": message.last_name, - "texter[email]": message.email, - "texter[cell]": message.user_cell, - ...campaignVariableColumns(message) - })); - - return { lastContactId, messages }; -}; - -interface UploadCampaignContacts { - campaignTitle: string; - interactionSteps: Array; - contactsCount: number; - campaignId: number; - helpers: ProgressTaskHelpers; - fileNameKey: string; - onlyOptOuts: boolean; -} - -const processAndUploadCampaignContacts = async ({ - campaignTitle, - interactionSteps, - contactsCount, - campaignId, - helpers, - fileNameKey, - onlyOptOuts -}: UploadCampaignContacts): Promise => { - const uniqueQuestionsByStepId = getUniqueQuestionsByStepId(interactionSteps); - - const campaignContactsKey = onlyOptOuts - ? `${fileNameKey}-optouts` - : fileNameKey; - - const campaignContactsUploadStream = await getUploadStream( - `${campaignContactsKey}.csv` - ); - - const campaignContactsWriteStream = format({ - headers: true, - writeHeaders: true - }); - - campaignContactsUploadStream.on("error", (err) => { - helpers.logger.error( - "error in campaignContactsUploadStream: ", - errToObj(err) - ); - }); - - campaignContactsWriteStream.on("error", (err) => { - helpers.logger.error( - "error in campaignContactsWriteStream: ", - errToObj(err) - ); - }); - - const campaignContactsUploadPromise = new Promise((resolve) => { - campaignContactsUploadStream.on("finish", resolve); - }); - - campaignContactsWriteStream.pipe(campaignContactsUploadStream); - - // Contact rows - let lastContactId; - let processed = 0; - try { - let chunkContactResult: ContactsChunk | false; - lastContactId = 0; - processed = 0; - while ( - (chunkContactResult = await processContactsChunk( - campaignId, - campaignTitle, - uniqueQuestionsByStepId, - lastContactId, - onlyOptOuts - )) - ) { - lastContactId = chunkContactResult.lastContactId; - helpers.logger.debug( - `Processing contact export for ${campaignId} chunk part ${lastContactId}` - ); - processed += CHUNK_SIZE; - await helpers.updateStatus( - Math.round((processed / contactsCount / 4) * 100) + - (onlyOptOuts ? 75 : 25) - ); - for (const c of chunkContactResult.contacts) { - campaignContactsWriteStream.write(c); - } - } - } finally { - campaignContactsWriteStream.end(); - } - - await campaignContactsUploadPromise; - - return getDownloadUrl(`${campaignContactsKey}.csv`); -}; - -interface UploadCampaignMessages { - fileNameKey: string; - contactsCount: number; - helpers: ProgressTaskHelpers; - campaignId: number; - campaignVariableNames: string[]; -} - -const processAndUploadCampaignMessages = async ({ - fileNameKey, - contactsCount, - helpers, - campaignId, - campaignVariableNames -}: UploadCampaignMessages): Promise => { - const messagesKey = `${fileNameKey}-messages`; - const messagesUploadStream = await getUploadStream(`${messagesKey}.csv`); - const messagesWriteStream = format({ - headers: true, - writeHeaders: true - }); - - messagesUploadStream.on("error", (err) => { - helpers.logger.error("error in messagesUploadStream: ", errToObj(err)); - }); - messagesWriteStream.on("error", (err) => { - helpers.logger.error("error in messagesWriteStream: ", errToObj(err)); - }); - - const messagesUploadPromise = new Promise((resolve) => { - messagesUploadStream.on("finish", resolve); - }); - - messagesWriteStream.pipe(messagesUploadStream); - - // Message rows - let lastContactId; - let processed = 0; - try { - let chunkMessageResult: MessagesChunk | false; - lastContactId = 0; - while ( - (chunkMessageResult = await processMessagesChunk( - campaignId, - campaignVariableNames, - lastContactId - )) - ) { - lastContactId = chunkMessageResult.lastContactId; - helpers.logger.debug( - `Processing message export for ${campaignId} chunk part ${lastContactId}` - ); - processed += CHUNK_SIZE; - await helpers.updateStatus( - Math.round((processed / contactsCount / 4) * 100) - ); - for (const m of chunkMessageResult.messages) { - messagesWriteStream.write(m); - } - } - } finally { - messagesWriteStream.end(); - } - - await messagesUploadPromise; - - return getDownloadUrl(`${messagesKey}.csv`); -}; - -interface UploadFilteredContacts { - fileNameKey: string; - helpers: ProgressTaskHelpers; - campaignId: number; - campaignTitle: string; -} - -const processAndUploadFilteredContacts = async ({ - fileNameKey, - helpers, - campaignId, - campaignTitle -}: UploadFilteredContacts): Promise => { - const filteredContactsKey = `${fileNameKey}-filteredContacts`; - const filteredContactsUploadStream = await getUploadStream( - `${filteredContactsKey}.csv` - ); - const filteredContactsWriteStream = format({ - headers: true, - writeHeaders: true - }); - - filteredContactsUploadStream.on("error", (err) => { - helpers.logger.error( - "error in filteredContactsUploadStream: ", - errToObj(err) - ); - }); - filteredContactsWriteStream.on("error", (err) => { - helpers.logger.error( - "error in filteredContactsWriteStream: ", - errToObj(err) - ); - }); - - const filteredContactsUploadPromise = new Promise((resolve) => { - filteredContactsUploadStream.on("finish", resolve); - }); - - const countQuery = await r - .reader("filtered_contact") - .count("*") - .where({ campaign_id: campaignId }); - const contactsCount = countQuery[0].count as number; - - filteredContactsWriteStream.pipe(filteredContactsUploadStream); - - let lastContactId; - let processed = 0; - try { - let chunkContactResult: ContactsChunk | false; - lastContactId = 0; - processed = 0; - while ( - (chunkContactResult = await processFilteredContactsChunk( - campaignId, - campaignTitle, - lastContactId - )) - ) { - lastContactId = chunkContactResult.lastContactId; - helpers.logger.debug( - `Processing filtered contact export for ${campaignId} chunk part ${lastContactId}` - ); - processed += CHUNK_SIZE; - await helpers.updateStatus( - Math.round((processed / contactsCount / 4) * 100) + 75 - ); - for (const c of chunkContactResult.contacts) { - filteredContactsWriteStream.write(c); - } - } - } finally { - filteredContactsWriteStream.end(); - } - - await filteredContactsUploadPromise; - - return getDownloadUrl(`${filteredContactsKey}.csv`); -}; - -export interface ExportCampaignPayload { - campaignId: number; - requesterId: number; - isAutomatedExport?: boolean; - spokeOptions: { - campaign: boolean; - messages: boolean; - optOuts: boolean; - filteredContacts: boolean; - }; -} - -export const exportCampaign: ProgressTask = async ( - payload, - helpers -) => { - const { - campaignId, - requesterId, - isAutomatedExport = false, - spokeOptions - } = payload; - const { - campaignTitle, - notificationEmail, - interactionSteps, - campaignVariableNames - } = await fetchExportData(campaignId, requesterId); - - const countQueryResult = await r - .reader("campaign_contact") - .count("*") - .where({ campaign_id: campaignId }); - const contactsCount = countQueryResult[0].count as number; - - // Attempt upload to cloud storage - let fileNameKey = campaignTitle.replace(/ /g, "_").replace(/\//g, "_"); - - if (!isAutomatedExport) { - const timestamp = DateTime.local().toFormat("y-mm-d-hh-mm-ss"); - fileNameKey = `${fileNameKey}-${timestamp}`; - } - - const { - campaign: shouldExportCampaign, - filteredContacts: shouldExportFilteredContacts, - messages: shouldExportMessages, - optOuts: shouldExportOptOuts - } = spokeOptions; - - const campaignExportUrl = shouldExportCampaign - ? await processAndUploadCampaignContacts({ - fileNameKey, - campaignId, - campaignTitle, - contactsCount, - helpers, - interactionSteps, - onlyOptOuts: false - }) - : null; - - const campaignOptOutsExportUrl = shouldExportOptOuts - ? await processAndUploadCampaignContacts({ - fileNameKey, - campaignId, - campaignTitle, - contactsCount, - helpers, - interactionSteps, - onlyOptOuts: true - }) - : null; - - const campaignMessagesExportUrl = shouldExportMessages - ? await processAndUploadCampaignMessages({ - fileNameKey, - campaignId, - contactsCount, - helpers, - campaignVariableNames - }) - : null; - - const campaignFilteredContactsExportUrl = shouldExportFilteredContacts - ? await processAndUploadFilteredContacts({ - fileNameKey, - campaignId, - campaignTitle, - helpers - }) - : null; - - helpers.logger.debug("Waiting for streams to finish"); - - try { - if (!isAutomatedExport) { - const exportContent = await getExportCampaignContent( - { - campaignExportUrl, - campaignFilteredContactsExportUrl, - campaignOptOutsExportUrl, - campaignMessagesExportUrl - }, - campaignTitle - ); - await sendEmail({ - to: notificationEmail, - subject: `Export ready for ${campaignTitle}`, - html: exportContent - }); - } - helpers.logger.info(`Successfully exported ${campaignId}`); - } finally { - helpers.logger.info("Finishing export process"); - } -}; - -export const addExportCampaign = async (payload: ExportCampaignPayload) => - addProgressJob({ - identifier: TASK_IDENTIFIER, - payload, - taskSpec: { - queueName: "export" - } - }); diff --git a/src/server/tasks/utils.ts b/src/server/tasks/utils.ts index 30d785ac3..3a0428046 100644 --- a/src/server/tasks/utils.ts +++ b/src/server/tasks/utils.ts @@ -102,15 +102,19 @@ export interface ProgressTaskHelpers extends JobHelpers { updateResult(result: Record): Promise; } -export type ProgressTask

= ( +export type KnownReturnProgressTask

= ( payload: P, helpers: ProgressTaskHelpers -) => void | Promise; +) => R | Promise; + +export type ProgressTask

= KnownReturnProgressTask; export interface ProgressTaskOptions { removeOnComplete: boolean; } +export type ProgressTaskList

= Record>; + export const wrapProgressTask =

( task: ProgressTask

, options: ProgressTaskOptions @@ -153,3 +157,14 @@ export const wrapProgressTask =

( throw err; } }; + +export const wrapProgressTaskList = async

( + list: ProgressTaskList

+) => { + return Object.entries(list).reduce((acc, [key, task]) => { + acc[key] = wrapProgressTask(task, { + removeOnComplete: true + }); + return acc; + }, {} as ProgressTaskList

); +}; diff --git a/src/server/worker.ts b/src/server/worker.ts index e5f6d4b4d..927538aa2 100644 --- a/src/server/worker.ts +++ b/src/server/worker.ts @@ -15,10 +15,7 @@ import { schedules as campaignBuilderSchedules, taskList as campaignBuilderTaskList } from "./tasks/campaign-builder"; -import { - exportCampaign, - TASK_IDENTIFIER as exportCampaignIdentifier -} from "./tasks/export-campaign"; +import { taskList as chunkTaskList } from "./tasks/chunk-tasks"; import { exportForVan, TASK_IDENTIFIER as exportForVanIdentifier @@ -65,7 +62,7 @@ import syncContactQuestionResponse from "./tasks/sync-contact-question-response" import syncSlackTeamMembers from "./tasks/sync-slack-team-members"; import { trollPatrol, trollPatrolForOrganization } from "./tasks/troll-patrol"; import updateOrgMessageUsage from "./tasks/update-org-message-usage"; -import { wrapProgressTask } from "./tasks/utils"; +import { wrapProgressTask, wrapProgressTaskList } from "./tasks/utils"; const logFactory: LogFunctionFactory = (scope) => (level, message, meta) => logger.log({ level, message, ...meta, ...scope }); @@ -101,9 +98,6 @@ export const getWorker = async (attempt = 0): Promise => { // eslint-disable-next-line max-len [QUEUE_AUTOSEND_ORGANIZATION_INITIALS_TASK_IDENTIFIER]: queueAutoSendOrganizationInitials, [PAUSE_AUTOSENDING_CAMPAIGNS_TASK_IDENTIFIER]: pauseAutosendingCampaigns, - [exportCampaignIdentifier]: wrapProgressTask(exportCampaign, { - removeOnComplete: true - }), [exportForVanIdentifier]: wrapProgressTask(exportForVan, { removeOnComplete: true }), @@ -115,7 +109,8 @@ export const getWorker = async (attempt = 0): Promise => { }), [exportOptOutsIdentifier]: exportOptOuts, ...ngpVanTaskList, - ...campaignBuilderTaskList + ...campaignBuilderTaskList, + ...wrapProgressTaskList(chunkTaskList) }; if (!workerSemaphore) {