From d7a048b6cfda1a9903f4f7feeb4cd78ed60c3edd Mon Sep 17 00:00:00 2001 From: Aashish John Date: Mon, 26 Aug 2024 21:55:02 -0400 Subject: [PATCH] refactor(second-pass): implement as task --- libs/gql-schema/schema.ts | 4 +- .../src/graphql/campaign-operations.graphql | 8 +- src/config.js | 4 + .../components/OperationDialog.tsx | 7 +- src/containers/CampaignList/index.tsx | 23 ++- src/schema.graphql | 4 +- src/server/api/lib/mark-second-pass.ts | 17 ++ src/server/api/root-mutations.ts | 143 ++++---------- src/server/lib/templates/mark-second-pass.tsx | 40 ++++ src/server/tasks/chunk-tasks/index.ts | 7 +- .../chunk-tasks/mark-second-pass/index.ts | 182 ++++++++++++++++++ .../chunk-tasks/mark-second-pass/utils.ts | 29 +++ src/server/tasks/chunk-tasks/utils.ts | 10 +- src/server/worker.ts | 4 +- 14 files changed, 358 insertions(+), 124 deletions(-) create mode 100644 src/server/api/lib/mark-second-pass.ts create mode 100644 src/server/lib/templates/mark-second-pass.tsx create mode 100644 src/server/tasks/chunk-tasks/mark-second-pass/index.ts create mode 100644 src/server/tasks/chunk-tasks/mark-second-pass/utils.ts diff --git a/libs/gql-schema/schema.ts b/libs/gql-schema/schema.ts index d18c4867a..678910f40 100644 --- a/libs/gql-schema/schema.ts +++ b/libs/gql-schema/schema.ts @@ -335,11 +335,11 @@ const rootSchema = ` requestTexts(count: Int!, email: String!, organizationId: String!, preferredTeamId: String!): String! releaseMessages(campaignId: String!, target: ReleaseActionTarget!, ageInHours: Float): String! releaseAllUnhandledReplies(organizationId: String!, ageInHours: Float, releaseOnRestricted: Boolean, limitToCurrentlyTextableContacts: Boolean): ReleaseAllUnhandledRepliesResult! - markForSecondPass(campaignId: String!, input: SecondPassInput!): String! + markForSecondPass(campaignId: String!, campaignTitle: String!, input: SecondPassInput!): String! startAutosending(campaignId: String!): Campaign! pauseAutosending(campaignId: String!): Campaign! updateCampaignAutosendingLimit(campaignId: String!, limit: Int): Campaign! - unMarkForSecondPass(campaignId: String!): String! + unMarkForSecondPass(campaignId: String!, campaignTitle: String!): String! deleteNeedsMessage(campaignId: String!): String! insertLinkDomain(organizationId: String!, domain: String!, maxUsageCount: Int!): LinkDomain! updateLinkDomain(organizationId: String!, domainId: String!, payload: UpdateLinkDomain!): LinkDomain! diff --git a/libs/spoke-codegen/src/graphql/campaign-operations.graphql b/libs/spoke-codegen/src/graphql/campaign-operations.graphql index 92d7e820b..71541de35 100644 --- a/libs/spoke-codegen/src/graphql/campaign-operations.graphql +++ b/libs/spoke-codegen/src/graphql/campaign-operations.graphql @@ -51,12 +51,12 @@ mutation deleteNeedsMessage($campaignId: String!) { deleteNeedsMessage(campaignId: $campaignId) } -mutation markForSecondPass($campaignId: String!, $input: SecondPassInput!) { - markForSecondPass(campaignId: $campaignId, input: $input) +mutation markForSecondPass($campaignId: String!, $campaignTitle: String!, $input: SecondPassInput!) { + markForSecondPass(campaignId: $campaignId, campaignTitle: $campaignTitle, input: $input) } -mutation unMarkForSecondPass($campaignId: String!) { - unMarkForSecondPass(campaignId: $campaignId) +mutation unMarkForSecondPass($campaignId: String!, $campaignTitle: String!) { + unMarkForSecondPass(campaignId: $campaignId, campaignTitle: $campaignTitle) } mutation toggleAutoAssign($campaignId: String!, $enabled: Boolean!) { diff --git a/src/config.js b/src/config.js index fe2895e6e..705826afb 100644 --- a/src/config.js +++ b/src/config.js @@ -736,6 +736,10 @@ const validators = { "A JSON blob passed directly to express-basic-auth for locking campaign previews", default: undefined }), + MARK_SECOND_PASS_CHUNK_SIZE: num({ + desc: "Chunk size to use when marking a campaign for a second pass", + default: 1000 + }), SKIP_TWILIO_VALIDATION: bool({ desc: "Whether to bypass Twilio header validation altogether.", default: false diff --git a/src/containers/CampaignList/components/OperationDialog.tsx b/src/containers/CampaignList/components/OperationDialog.tsx index c15cd05f7..42bd5441d 100644 --- a/src/containers/CampaignList/components/OperationDialog.tsx +++ b/src/containers/CampaignList/components/OperationDialog.tsx @@ -35,7 +35,6 @@ export interface OperationDialogProps extends OperationDialogBodyProps { export const OperationDialogBody = (props: OperationDialogBodyProps) => { const { inProgress, finished, executing, error, setInProgress } = props; - const { name: operationName, campaign } = inProgress; const operationDefinition = dialogOperations[operationName]; @@ -89,6 +88,12 @@ export const OperationDialogBody = (props: OperationDialogBodyProps) => { return (

{operationDefinition?.body(campaign)}

+ {campaign.hasUnsentInitialMessages && ( +

+ WARNING: This campaign still has contacts with unsent initial + messages{" "} +

+ )}

To read about best practices for second passes, head{" "} = (props) => { break; } case isMarkForSecondPass(inProgress): { - const { excludeNewer, hours } = inProgress.payload; + const { + excludeNewer, + excludeRecentlyTexted, + hours, + days + } = inProgress.payload; + const { id: campaignId, title: campaignTitle } = campaign; + const excludeAgeInHours = excludeRecentlyTexted + ? (days || 0) * 24 + (hours || 0) + : undefined; + const { data, errors } = await markCampaign({ variables: { - campaignId: campaign.id, - input: { excludeNewer, excludeAgeInHours: hours } + campaignId, + campaignTitle, + input: { + excludeNewer, + excludeAgeInHours + } } }); @@ -141,8 +155,9 @@ export const CampaignList: React.FC = (props) => { break; } case isUnMarkForSecondPass(inProgress): { + const { id: campaignId, title: campaignTitle } = campaign; const { data, errors } = await unmarkCampaign({ - variables: { campaignId: campaign.id } + variables: { campaignId, campaignTitle } }); setStateAfterOperation(data?.unMarkForSecondPass, errors); break; diff --git a/src/schema.graphql b/src/schema.graphql index 15cf5bdc1..f9219a54e 100644 --- a/src/schema.graphql +++ b/src/schema.graphql @@ -301,11 +301,11 @@ type RootMutation { requestTexts(count: Int!, email: String!, organizationId: String!, preferredTeamId: String!): String! releaseMessages(campaignId: String!, target: ReleaseActionTarget!, ageInHours: Float): String! releaseAllUnhandledReplies(organizationId: String!, ageInHours: Float, releaseOnRestricted: Boolean, limitToCurrentlyTextableContacts: Boolean): ReleaseAllUnhandledRepliesResult! - markForSecondPass(campaignId: String!, input: SecondPassInput!): String! + markForSecondPass(campaignId: String!, campaignTitle: String!, input: SecondPassInput!): String! startAutosending(campaignId: String!): Campaign! pauseAutosending(campaignId: String!): Campaign! updateCampaignAutosendingLimit(campaignId: String!, limit: Int): Campaign! - unMarkForSecondPass(campaignId: String!): String! + unMarkForSecondPass(campaignId: String!, campaignTitle: String!): String! deleteNeedsMessage(campaignId: String!): String! insertLinkDomain(organizationId: String!, domain: String!, maxUsageCount: Int!): LinkDomain! updateLinkDomain(organizationId: String!, domainId: String!, payload: UpdateLinkDomain!): LinkDomain! diff --git a/src/server/api/lib/mark-second-pass.ts b/src/server/api/lib/mark-second-pass.ts new file mode 100644 index 000000000..c2673e7b4 --- /dev/null +++ b/src/server/api/lib/mark-second-pass.ts @@ -0,0 +1,17 @@ +/* eslint-disable import/prefer-default-export */ +import type { User } from "@spoke/spoke-codegen"; +import { r } from "src/server/models"; + +import { accessRequired } from "../errors"; + +export const getSecondPassCampaign = async (campaignId: number, user: User) => { + // verify permissions + const campaign = await r + .knex("campaign") + .where({ id: campaignId }) + .first(["organization_id", "is_archived", "autosend_status"]); + + const organizationId = campaign.organization_id; + await accessRequired(user, organizationId, "ADMIN", true); + return campaign; +}; diff --git a/src/server/api/root-mutations.ts b/src/server/api/root-mutations.ts index 230e6d080..8810e6031 100644 --- a/src/server/api/root-mutations.ts +++ b/src/server/api/root-mutations.ts @@ -31,6 +31,7 @@ import { cacheableData, r } from "../models"; import { getUserById } from "../models/cacheable_queries"; import { Notifications, sendUserNotification } from "../notifications"; import { addExportCampaign } from "../tasks/chunk-tasks/export-campaign"; +import { addMarkSecondPass } from "../tasks/chunk-tasks/mark-second-pass"; import { addExportForVan } from "../tasks/export-for-van"; import { TASK_IDENTIFIER as exportOptOutsIdentifier } from "../tasks/export-opt-outs"; import { addFilterLandlines } from "../tasks/filter-landlines"; @@ -63,6 +64,7 @@ import { markAutosendingPaused, unqueueAutosending } from "./lib/campaign"; +import { getSecondPassCampaign } from "./lib/mark-second-pass"; import { saveNewIncomingMessage } from "./lib/message-sending"; import { processNumbers } from "./lib/opt-out"; import { sendMessage } from "./lib/send-message"; @@ -1874,18 +1876,12 @@ const rootMutations = { markForSecondPass: async ( _ignore, - { campaignId, input: { excludeAgeInHours, excludeNewer } }, + { campaignId, campaignTitle, input: { excludeAgeInHours, excludeNewer } }, { user } ) => { // verify permissions - const campaign = await r - .knex("campaign") - .where({ id: parseInt(campaignId, 10) }) - .first(["organization_id", "is_archived", "autosend_status"]); - - const organizationId = campaign.organization_id; - - await accessRequired(user, organizationId, "ADMIN", true); + const numCampaignId = parseInt(campaignId, 10); + const campaign = await getSecondPassCampaign(campaignId, user); if (!["complete", "unstarted"].includes(campaign.autosend_status)) { throw new Error( @@ -1895,61 +1891,21 @@ const rootMutations = { ); } - await r - .knex("campaign") - .update({ autosend_status: "unstarted" }) - .where({ id: parseInt(campaignId, 10) }); - - const queryArgs = [parseInt(campaignId, 10)]; - if (excludeAgeInHours) { - queryArgs.push(parseFloat(excludeAgeInHours)); - } - - const excludeNewerSql = ` - and not exists ( - select - cell - from - campaign_contact as newer_contact - where - newer_contact.cell = current_contact.cell - and newer_contact.created_at > current_contact.created_at - ) - `; - - /** - * "Mark Campaign for Second Pass", will only mark contacts for a second - * pass that do not have a more recently created membership in another campaign. - * Using SQL injection to avoid passing archived as a binding - * Should help with guaranteeing partial index usage - */ - const updateSql = ` - update - campaign_contact as current_contact - set - message_status = 'needsMessage' - where current_contact.campaign_id = ? - and current_contact.message_status = 'messaged' - and current_contact.archived = ${campaign.is_archived} - ${excludeNewer ? excludeNewerSql : ""} - and not exists ( - select 1 - from message - where current_contact.id = message.campaign_contact_id - and is_from_contact = true - ) - ${ - excludeAgeInHours - ? "and current_contact.updated_at < now() - interval '?? hour'" - : "" - } - ; - `; + const [{ count: contactsCount }] = await r + .knex("campaign_contact") + .where({ campaign_id: numCampaignId, message_status: "messaged" }) + .count(); - const updateResultRaw = await r.knex.raw(updateSql, queryArgs); - const updateResult = updateResultRaw.rowCount; + addMarkSecondPass({ + organizationId: campaign.organization_id, + campaignId, + campaignTitle, + requesterId: user.id, + excludeAgeInHours, + excludeNewer + }); - return `Marked ${updateResult} campaign contacts for a second pass.`; + return `Queuing ${contactsCount} campaign contacts for a second pass. You'll receive an email when the second pass is fully marked!`; }, startAutosending: async (_ignore, { campaignId }, { loaders, user }) => { @@ -2044,53 +2000,30 @@ const rootMutations = { return updatedCampaign; }, - unMarkForSecondPass: async (_ignore, { campaignId }, { user }) => { + unMarkForSecondPass: async ( + _ignore, + { campaignId, campaignTitle }, + { user } + ) => { // verify permissions - const campaign = await r - .knex("campaign") - .where({ id: parseInt(campaignId, 10) }) - .first(["organization_id", "is_archived"]); + const numCampaignId = parseInt(campaignId, 10); + const campaign = await getSecondPassCampaign(numCampaignId, user); - const organizationId = campaign.organization_id; - - await accessRequired(user, organizationId, "ADMIN", true); - - /** - * "Un-Mark Campaign for Second Pass", will only mark contacts as messaged - * if they are currently needsMessage and have been sent a message and have not replied - * - * Using SQL injection to avoid passing archived as a binding - * Should help with guaranteeing partial index usage - */ - const updateResultRaw = await r.knex.raw( - ` - update - campaign_contact - set - message_status = 'messaged' - where campaign_contact.campaign_id = ? - and campaign_contact.message_status = 'needsMessage' - and campaign_contact.archived = ${campaign.is_archived} - and exists ( - select 1 - from message - where message.campaign_contact_id = campaign_contact.id - and is_from_contact = false - ) - and not exists ( - select 1 - from message - where message.campaign_contact_id = campaign_contact.id - and is_from_contact = true - ) - ; - `, - [parseInt(campaignId, 10)] - ); + // The precise count unmarked may be lower if some contacts never got a first message + const [{ count: contactsCount }] = await r + .knex("campaign_contact") + .where({ campaign_id: numCampaignId, message_status: "needsMessage" }) + .count(); - const updateResult = updateResultRaw.rowCount; + addMarkSecondPass({ + unmark: true, + organizationId: campaign.organization_id, + campaignId, + campaignTitle, + requesterId: user.id + }); - return `Un-Marked ${updateResult} campaign contacts for a second pass.`; + return `Queuing ${contactsCount} campaign contacts to remove second pass marking. You'll receive an email when this is complete!`; }, deleteNeedsMessage: async (_ignore, { campaignId }, { user }) => { diff --git a/src/server/lib/templates/mark-second-pass.tsx b/src/server/lib/templates/mark-second-pass.tsx new file mode 100644 index 000000000..7399e229e --- /dev/null +++ b/src/server/lib/templates/mark-second-pass.tsx @@ -0,0 +1,40 @@ +import React from "react"; +import ReactDOMServer from "react-dom/server"; + +import { config } from "../../../config"; + +export interface MarkSecondPassProps { + campaignId: number; + organizationId: number; + campaignTitle: string; + unmark?: boolean; +} + +const MarkSecondPass: React.FC = ({ + campaignId, + campaignTitle, + organizationId, + unmark +}) => { + const titleLink = ( + + {campaignTitle} + + ); + return ( + <> +

+ Your second pass {unmark ? "un" : ""}marking for {titleLink} is + complete! Navigate back to the campaign here: {titleLink} +

+

-- The Spoke Rewired Team

+ + ); +}; + +export const getContent = async (props: MarkSecondPassProps) => { + const template = ; + return ReactDOMServer.renderToStaticMarkup(template); +}; diff --git a/src/server/tasks/chunk-tasks/index.ts b/src/server/tasks/chunk-tasks/index.ts index ac1e5b82a..65803fe31 100644 --- a/src/server/tasks/chunk-tasks/index.ts +++ b/src/server/tasks/chunk-tasks/index.ts @@ -5,7 +5,12 @@ import { exportCampaign, TASK_IDENTIFIER as EXPORT_CAMPAIGN_IDENTIFIER } from "./export-campaign"; +import { + markSecondPass, + TASK_IDENTIFIER as MARK_SECOND_PASS_IDENTIFIER +} from "./mark-second-pass"; export const taskList: TaskList = { - [EXPORT_CAMPAIGN_IDENTIFIER]: exportCampaign + [EXPORT_CAMPAIGN_IDENTIFIER]: exportCampaign, + [MARK_SECOND_PASS_IDENTIFIER]: markSecondPass }; diff --git a/src/server/tasks/chunk-tasks/mark-second-pass/index.ts b/src/server/tasks/chunk-tasks/mark-second-pass/index.ts new file mode 100644 index 000000000..b8232a54d --- /dev/null +++ b/src/server/tasks/chunk-tasks/mark-second-pass/index.ts @@ -0,0 +1,182 @@ +import { getContent } from "../../../lib/templates/mark-second-pass"; +import { sendEmail } from "../../../mail"; +import { r } from "../../../models"; +import type { + MarkSecondPassPayload, + ProcessSecondPassChunkPayload, + ProgressTask +} from "./utils"; +import { + addProgressJob, + CHUNK_SIZE, + getChunkedContactsCte, + getContactCount, + getNotificationEmail, + TASK_IDENTIFIER +} from "./utils"; + +export { TASK_IDENTIFIER }; + +export const processChunk = async ( + payload: ProcessSecondPassChunkPayload +): Promise => { + const { + campaignId, + lastContactId = 0, + unmark, + excludeNewer, + excludeAgeInHours + } = payload; + + // don't unmark contacts where a first msg was never sent + const msgExists = ` + and exists ( + select 1 + from message + where message.campaign_contact_id = cc.id + ) + `; + + const markFilters = ` + ${ + excludeNewer + ? `and not exists ( + select 1 from campaign_contact as newer_contact + where newer_contact.cell = cc.cell + and newer_contact.id > cc.id + )` + : "" + } + ${ + excludeAgeInHours + ? `and not exists ( + select 1 from message where campaign_contact_id = cc.id + and created_at > now() - interval '?? hour' + )` + : "" + }`; + + const queryArgs: (string | number)[] = [campaignId, lastContactId]; + if (excludeAgeInHours) queryArgs.push(excludeAgeInHours); + queryArgs.push(CHUNK_SIZE); + + const contactsFilter = ` + message_status = ${unmark ? "'needsMessage'" : "'messaged'"} + ${unmark ? msgExists : markFilters} + `; + + const contactsCte = getChunkedContactsCte(contactsFilter); + const { + rows: [{ max: maxContactId }] + } = await r.reader.raw( + ` + ${contactsCte}, + mark_contacts as ( + update campaign_contact set message_status = ${ + unmark ? "'messaged'" : "'needsMessage'" + } + where id in ( + select id from campaign_contacts + ) + returning id + ) + + -- this ensures that the mark cte will actually run, but return the max contact id overall + select max(id) from ( + select(id) from campaign_contacts + union + select(id) from mark_contacts + ) ctes + `, + queryArgs + ); + return maxContactId; +}; + +type ProcessContactsPayload = Omit< + MarkSecondPassPayload, + "organizationId" | "requesterId" | "campaignTitle" +>; +const processContacts: ProgressTask = async ( + payload, + helpers +) => { + const { campaignId, unmark, excludeNewer, excludeAgeInHours } = payload; + const contactsCount = await getContactCount(campaignId); + + let lastContactId = 0; + let processed = 0; + let chunkContactResult: number; + + while ( + // eslint-disable-next-line no-cond-assign + (chunkContactResult = await processChunk({ + campaignId, + lastContactId, + unmark, + excludeNewer, + excludeAgeInHours + })) + ) { + lastContactId = chunkContactResult; + helpers.logger.debug( + `Processing ${ + unmark ? "un" : "" + }mark second pass for ${campaignId} chunk part ${lastContactId}` + ); + processed += CHUNK_SIZE; + await helpers.updateStatus(Math.round(processed / contactsCount)); + } +}; + +export const markSecondPass: ProgressTask = async ( + payload, + helpers +) => { + const { + campaignId, + campaignTitle, + organizationId, + requesterId, + unmark, + excludeNewer, + excludeAgeInHours + } = payload; + + const unPrefix = unmark ? "un" : ""; + await processContacts( + { campaignId, unmark, excludeNewer, excludeAgeInHours }, + helpers + ); + + const notificationEmail = await getNotificationEmail(requesterId); + const emailContent = await getContent({ + campaignId, + organizationId, + campaignTitle, + unmark + }); + + if (!unmark) { + await r + .knex("campaign") + .update({ autosend_status: "unstarted" }) + .where({ id: campaignId }); + } + + await sendEmail({ + to: notificationEmail, + subject: `Second pass ${unPrefix}marking complete for ${campaignTitle}`, + html: emailContent + }); + + helpers.logger.info( + `Successfully ${unPrefix}marked ${campaignId} for second pass` + ); +}; + +export const addMarkSecondPass = async (payload: MarkSecondPassPayload) => + addProgressJob({ + identifier: TASK_IDENTIFIER, + payload + }); diff --git a/src/server/tasks/chunk-tasks/mark-second-pass/utils.ts b/src/server/tasks/chunk-tasks/mark-second-pass/utils.ts new file mode 100644 index 000000000..5e9040dbd --- /dev/null +++ b/src/server/tasks/chunk-tasks/mark-second-pass/utils.ts @@ -0,0 +1,29 @@ +import { config } from "../../../../config"; +import type { ChunkTaskPayload, ProcessChunkPayload } from "../utils"; + +export type { ProgressTask } from "../../utils"; +export { addProgressJob } from "../../utils"; +export { + getChunkedContactsCte, + getContactCount, + getNotificationEmail +} from "../utils"; + +export const TASK_IDENTIFIER = "mark-second-pass"; +export const CHUNK_SIZE = config.MARK_SECOND_PASS_CHUNK_SIZE; + +interface SecondPassOptions { + unmark?: boolean; + excludeNewer?: boolean; + excludeAgeInHours?: number; +} + +export interface MarkSecondPassPayload + extends SecondPassOptions, + ChunkTaskPayload { + organizationId: number; +} + +export interface ProcessSecondPassChunkPayload + extends ProcessChunkPayload, + SecondPassOptions {} diff --git a/src/server/tasks/chunk-tasks/utils.ts b/src/server/tasks/chunk-tasks/utils.ts index 0fc5169e9..c7e8b9f85 100644 --- a/src/server/tasks/chunk-tasks/utils.ts +++ b/src/server/tasks/chunk-tasks/utils.ts @@ -49,16 +49,18 @@ export const andSqlFilter = (filter?: string) => /** * Fetch a chunk of contacts for the campaign - * @param filter a SQL condition to filter the contacts (ex. is_opted_out = true) - * @returns CTE for the selected contacts which requires query params [campaignId, lastContactId, limit] + * @param filter a SQL condition to filter the contacts for the campaign_contact alias cc + * (ex. cc.is_opted_out = true) + * @returns campaign_contacts: + * the CTE for the selected contacts which requires query params [campaignId, lastContactId, limit] */ export const getChunkedContactsCte = (filter?: string) => { return ` with campaign_contacts as ( - select * from campaign_contact + select * from campaign_contact cc where campaign_id = ? and id > ? ${andSqlFilter(filter)} - order by campaign_contact.id asc + order by cc.id asc limit ? )`; }; diff --git a/src/server/worker.ts b/src/server/worker.ts index 927538aa2..384a6d240 100644 --- a/src/server/worker.ts +++ b/src/server/worker.ts @@ -75,6 +75,8 @@ let workerSemaphore = false; export const getWorker = async (attempt = 0): Promise => { if (worker) return worker; + const chunkTasks = await wrapProgressTaskList(chunkTaskList); + const taskList: TaskList = { "handle-autoassignment-request": handleAutoassignmentRequest, "release-stale-replies": releaseStaleReplies, @@ -110,7 +112,7 @@ export const getWorker = async (attempt = 0): Promise => { [exportOptOutsIdentifier]: exportOptOuts, ...ngpVanTaskList, ...campaignBuilderTaskList, - ...wrapProgressTaskList(chunkTaskList) + ...chunkTasks }; if (!workerSemaphore) {