diff --git a/server/src/modules/actions/documents.actions.ts b/server/src/modules/actions/documents.actions.ts index eae697790..ad421bb99 100644 --- a/server/src/modules/actions/documents.actions.ts +++ b/server/src/modules/actions/documents.actions.ts @@ -95,7 +95,7 @@ export const createEmptyDocument = async (options: IUploadDocumentOptions) => { throw new Error("Missing filename"); } - const doucument = await createDocument({ + const document = await createDocument({ _id: documentId, type_document: options.type_document, ext_fichier: options.filename.split(".").pop(), @@ -110,7 +110,7 @@ export const createEmptyDocument = async (options: IUploadDocumentOptions) => { updated_at: new Date(), created_at: new Date(), }); - return doucument as IDocument; + return document as IDocument; }; export const uploadFile = async ( diff --git a/server/src/modules/actions/job.actions.ts b/server/src/modules/actions/job.actions.ts index 7e8860f27..b2bf153db 100644 --- a/server/src/modules/actions/job.actions.ts +++ b/server/src/modules/actions/job.actions.ts @@ -1,12 +1,10 @@ -import { Filter, FindOptions } from "mongodb"; +import { Filter, FindOptions, ObjectId } from "mongodb"; import { IJob, JOB_STATUS_LIST } from "shared/models/job.model"; import { getDbCollection } from "@/common/utils/mongodbUtils"; /** * Création d'un job - * @param {*} data - * @returns */ export const createJob = async ({ name, @@ -33,13 +31,14 @@ export const findJob = async ( return await getDbCollection("jobs").findOne(filter, options); }; +export const findJobs = async (filter: Filter) => { + return await getDbCollection("jobs").find(filter).toArray(); +}; + /** * Mise à jour d'un job - * @param {*} _id - * @param {Object} data - * @returns */ -export const updateJob = async (_id, data) => { +export const updateJob = async (_id: ObjectId, data: Partial) => { return getDbCollection("jobs").updateOne( { _id }, { $set: { ...data, updated_at: new Date() } } diff --git a/server/src/modules/actions/mailingLists.actions.ts b/server/src/modules/actions/mailingLists.actions.ts index 2ea847892..8c07d81ab 100644 --- a/server/src/modules/actions/mailingLists.actions.ts +++ b/server/src/modules/actions/mailingLists.actions.ts @@ -2,8 +2,8 @@ import { Transform } from "node:stream"; import { pipeline } from "node:stream/promises"; import { stringify } from "csv-stringify"; -import { Filter, ObjectId, UpdateFilter } from "mongodb"; -import { IMailingList } from "shared/models/mailingList.model"; +import { Filter, ObjectId } from "mongodb"; +import { IJob } from "shared/models/job.model"; import { DOCUMENT_TYPES } from "shared/routes/upload.routes"; import logger from "@/common/logger"; @@ -16,6 +16,7 @@ import { TrainingLink, } from "../../common/apis/lba"; import { uploadToStorage } from "../../common/utils/ovhUtils"; +import { addJob } from "../jobs/jobs"; import { noop } from "../server/utils/upload.utils"; import { createEmptyDocument, @@ -23,101 +24,57 @@ import { findDocument, importDocumentContent, } from "./documents.actions"; - -const DEFAULT_LOOKUP = { - from: "documents", - let: { documentId: { $toObjectId: "$document_id" } }, - pipeline: [ - { - $match: { - $expr: { $eq: ["$_id", "$$documentId"] }, - }, - }, - ], - as: "document", -}; - -const DEFAULT_UNWIND = { - path: "$document", - preserveNullAndEmptyArrays: true, -}; +import { findJob, findJobs, updateJob } from "./job.actions"; /** * CRUD */ -interface ICreateMailingList extends Omit {} +export interface IMailingList { + user_id: string; + source: string; + document_id?: string; +} -export const createMailingList = async (data: ICreateMailingList) => { - const { insertedId: _id } = await getDbCollection("mailingLists").insertOne( - data - ); +export const createMailingList = async (data: IMailingList) => { + const outputDocument = await createEmptyDocument({ + type_document: `mailing-list-${data.source}`, + filename: `mailing-list-${data.source}-${new ObjectId()}.csv`, + }); - return findMailingList({ _id }); + return addJob({ + name: "generate:mailing-list", + payload: { + user_id: data.user_id, + source: data.source, + document_id: outputDocument._id.toString(), + }, + }); }; -export const findMailingList = async (filter: Filter) => { - return getDbCollection("mailingLists") - .aggregate([ - { - $match: filter, - }, - { - $lookup: DEFAULT_LOOKUP, - }, - { - $unwind: DEFAULT_UNWIND, - }, - ]) - .next(); +export const findMailingList = async (filter: Filter) => { + return findJob({ + name: "generate:mailing-list", + ...filter, + }); }; -export const findMailingLists = async (filter: Filter) => { - const users = await getDbCollection("mailingLists") - .aggregate([ - { - $match: filter, - }, - { - $lookup: DEFAULT_LOOKUP, - }, - { - $unwind: DEFAULT_UNWIND, - }, - ]) - .toArray(); - - return users; +export const findMailingLists = async (filter: Filter) => { + return findJobs(filter); }; export const updateMailingList = async ( - mailingList: IMailingList, - data: Partial, - updateFilter: UpdateFilter = {} + _id: ObjectId, + data: Partial ) => { - return await getDbCollection("mailingLists").findOneAndUpdate( - { - _id: mailingList._id, - }, - { - $set: data, - ...updateFilter, - } - ); + return updateJob(_id, data); }; /** * ACTIONS */ -export const processMailingList = async ({ mailing_list_id }) => { - const mailingList = await findMailingList({ - _id: mailing_list_id, - }); - if (!mailingList) { - throw new Error("Processor > /mailing-list: Can't find mailing list"); - } - +export const processMailingList = async (mailingList: IMailingList) => { switch (mailingList.source) { case DOCUMENT_TYPES.VOEUX_PARCOURSUP_MAI_2023: case DOCUMENT_TYPES.VOEUX_AFFELNET_MAI_2023: @@ -132,6 +89,12 @@ export const processMailingList = async ({ mailing_list_id }) => { }; const handleVoeuxParcoursupMai2023 = async (mailingList: IMailingList) => { + const job = await findJob({ + "payload.document_id": mailingList.document_id, + }); + + if (!job) throw new Error("Job not found"); + const document = await findDocument({ type_document: mailingList.source, }); @@ -141,77 +104,73 @@ const handleVoeuxParcoursupMai2023 = async (mailingList: IMailingList) => { const batchSize = LIMIT_TRAINING_LINKS_PER_REQUEST; let skip = 0; let hasMore = true; + let processed = 0; - const outputDocument = await createEmptyDocument({ - type_document: `mailing-list-${DOCUMENT_TYPES.VOEUX_PARCOURSUP_MAI_2023}`, - filename: `mailing-list-${mailingList._id.toString()}-${ - mailingList.source - }.csv`, + const outputDocument = await findDocument({ + _id: new ObjectId(mailingList.document_id), }); + if (!outputDocument) throw new Error("Output document not found"); + while (hasMore) { - try { - const wishes = await getDbCollection("documentContents") - .find({ document_id: document._id.toString() }) - .limit(batchSize) - .skip(skip) - .toArray(); - - if (!wishes.length) { - hasMore = false; - continue; - } - - // TODO: vérifier le nom des colonnes - const data = wishes?.map((dc) => ({ - id: dc._id.toString(), - cle_ministere_educatif: dc.content?.cle_ministere_educatif ?? "", - mef: dc.content?.code_mef ?? "", - code_postal: dc.content?.code_postal ?? "", - uai: dc.content?.code_uai_etab_accueil ?? "", - cfd: dc.content?.cfd ?? "", // pas présent dans le fichier - rncp: dc.content?.rncp ?? "", // pas présent dans le fichier - email: - dc.content?.mail_responsable_1 ?? - dc.content?.mail_responsable_2 ?? - "", - nom_eleve: dc.content?.nom_eleve ?? "", - prenom_eleve: [ - dc.content?.prenom_1 ?? "", - dc.content?.prenom_2 ?? "", - dc.content?.prenom_3 ?? "", - ] - .join(" ") - .trim(), - libelle_etab_accueil: dc.content?.libelle_etab_accueil ?? "", - libelle_formation: dc.content?.libelle_formation ?? "", - })); - - const tmp = (await getTrainingLinks(data)) as TrainingLink[]; - const tmpContent = tmp.flat(); - - await importDocumentContent(outputDocument, tmpContent, (line) => line); - - // Check if there are more documents to retrieve - if (wishes.length === batchSize) { - skip += batchSize; - } else { - hasMore = false; - logger.info("All documents retrieved"); - } - } catch (err) { - await updateMailingList(mailingList, { - status: "error", - }); - console.error("Error retrieving documents:", err); - return; + const wishes = await getDbCollection("documentContents") + .find({ document_id: document._id.toString() }) + .limit(batchSize) + .skip(skip) + .toArray(); + + if (!wishes.length) { + hasMore = false; + continue; } - } - await updateMailingList(mailingList, { - document_id: outputDocument._id.toString(), - status: "finished", - }); + // TODO: vérifier le nom des colonnes + const data = wishes?.map((dc) => ({ + id: dc._id.toString(), + cle_ministere_educatif: dc.content?.cle_ministere_educatif ?? "", + mef: dc.content?.code_mef ?? "", + code_postal: dc.content?.code_postal ?? "", + uai: dc.content?.code_uai_etab_accueil ?? "", + cfd: dc.content?.cfd ?? "", // pas présent dans le fichier + rncp: dc.content?.rncp ?? "", // pas présent dans le fichier + email: + dc.content?.mail_responsable_1 ?? dc.content?.mail_responsable_2 ?? "", + nom_eleve: dc.content?.nom_eleve ?? "", + prenom_eleve: [ + dc.content?.prenom_1 ?? "", + dc.content?.prenom_2 ?? "", + dc.content?.prenom_3 ?? "", + ] + .join(" ") + .trim(), + libelle_etab_accueil: dc.content?.libelle_etab_accueil ?? "", + libelle_formation: dc.content?.libelle_formation ?? "", + })); + + const tmp = (await getTrainingLinks(data)) as TrainingLink[]; + const tmpContent = tmp.flat(); + + await importDocumentContent(outputDocument, tmpContent, (line) => line); + + processed += wishes.length; + + await updateJob(job._id, { + payload: { + ...job.payload, + processed: document.lines_count + ? (processed / document.lines_count) * 100 // + : 0, + processed_count: processed, + }, + }); + // Check if there are more documents to retrieve + if (wishes.length === batchSize) { + skip += batchSize; + } else { + hasMore = false; + logger.info("All documents retrieved"); + } + } }; async function* getLine(cursor) { @@ -309,7 +268,11 @@ export const createMailingListFile = async (document: any) => { // }); }; -export const deleteMailingList = async (mailingList: IMailingList) => { - await deleteDocumentById(new ObjectId(mailingList.document_id)); - await getDbCollection("mailingLists").deleteOne({ _id: mailingList._id }); +export const deleteMailingList = async (mailingList: IJob) => { + if (mailingList.payload?.document_id) { + await deleteDocumentById( + new ObjectId(mailingList.payload.document_id as string) + ); + } + await getDbCollection("jobs").deleteOne({ _id: mailingList._id }); }; diff --git a/server/src/modules/jobs/executeJob.ts b/server/src/modules/jobs/executeJob.ts index c625c1930..865a4ebab 100644 --- a/server/src/modules/jobs/executeJob.ts +++ b/server/src/modules/jobs/executeJob.ts @@ -2,7 +2,7 @@ import { formatDuration, intervalToDuration } from "date-fns"; import { IJob, JOB_STATUS_LIST } from "shared/models/job.model"; import logger from "@/common/logger"; -import { updateJob } from "@/modules/actions/job.actions"; +import { findJob, updateJob } from "@/modules/actions/job.actions"; import { closeMongodbConnection } from "../../common/utils/mongodbUtils"; @@ -40,9 +40,11 @@ export const executeJob = async ( const duration = formatDuration( intervalToDuration({ start: startDate, end: endDate }) ); + // récupérer la version la plus récente pour conserver les éventuelles modifications effectuées durant l'éxecution du job + const finishedJob = (await findJob({ _id: job._id })) as IJob; await updateJob(job._id, { status: error ? JOB_STATUS_LIST.ERRORED : JOB_STATUS_LIST.FINISHED, - payload: { duration, result, error }, + payload: { ...finishedJob.payload, duration, result, error }, ended_at: endDate, }); if (options.runningLogs) logger.info(`Job: ${job.name} Ended`); diff --git a/server/src/modules/server/mailingList.routes.ts b/server/src/modules/server/mailingList.routes.ts index ed5f462ec..9febee2f1 100644 --- a/server/src/modules/server/mailingList.routes.ts +++ b/server/src/modules/server/mailingList.routes.ts @@ -4,15 +4,13 @@ import Boom from "@hapi/boom"; import { ObjectId } from "mongodb"; import { oleoduc } from "oleoduc"; import { IUser } from "shared/models/user.model"; -import { - SReqGetMailingList, - SResGetMailingLists, -} from "shared/routes/mailingList.routes"; +import { SReqGetMailingList } from "shared/routes/mailingList.routes"; import { Readable } from "stream"; import logger from "../../common/logger"; import * as crypto from "../../common/utils/cryptoUtils"; import { getFromStorage } from "../../common/utils/ovhUtils"; +import { findDocument } from "../actions/documents.actions"; import { createMailingList, createMailingListFile, @@ -20,7 +18,6 @@ import { findMailingList, findMailingLists, } from "../actions/mailingLists.actions"; -import { addJob } from "../jobs/jobs"; import { Server } from "./server"; import { noop } from "./utils/upload.utils"; @@ -40,37 +37,19 @@ export const mailingListRoutes = ({ server }: { server: Server }) => { const { source } = request.body; const user = request.user as IUser; - const mailingList = await createMailingList({ - source, - status: "pending", - updated_at: new Date(), - created_at: new Date(), - user_id: user._id.toString(), - }); + try { + await createMailingList({ user_id: user._id.toString(), source }); - if (!mailingList) { + return response.status(200).send(); + } catch (error) { throw Boom.badData("Impossible de créer la liste de diffusion"); } - - await addJob({ - name: "generate:mailing-list", - payload: { - mailing_list_id: mailingList._id, - }, - }); - - return response.status(200).send(mailingList); } ); server.get( "/mailing-lists", { - schema: { - response: { - 200: SResGetMailingLists, - }, - } as const, preHandler: server.auth([ server.validateSession, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -80,7 +59,7 @@ export const mailingListRoutes = ({ server }: { server: Server }) => { const user = request.user as IUser; const mailingLists = await findMailingLists({ - user_id: user._id.toString(), + "payload.user_id": user._id.toString(), }); return response.status(200).send(mailingLists as any); @@ -116,16 +95,25 @@ export const mailingListRoutes = ({ server }: { server: Server }) => { */ if ( - !mailingList?.document || - user?._id.toString() !== mailingList?.user_id + !mailingList || + !mailingList.payload?.document_id || + user?._id.toString() !== mailingList.payload?.user_id ) { throw Boom.forbidden("Forbidden"); } + const document = await findDocument({ + _id: new ObjectId(mailingList.payload.document_id as string), + }); + + if (!document) { + throw Boom.badData("Impossible de télécharger le fichier"); + } + let stream: IncomingMessage | Readable; let fileNotFound = false; try { - stream = await getFromStorage(mailingList.document.chemin_fichier); + stream = await getFromStorage(document.chemin_fichier); } catch (error: any) { if (error.message.includes("Status code 404")) { fileNotFound = true; @@ -136,20 +124,20 @@ export const mailingListRoutes = ({ server }: { server: Server }) => { if (fileNotFound) { logger.info("file not found"); - await createMailingListFile(mailingList.document); - stream = await getFromStorage(mailingList.document.chemin_fichier); + await createMailingListFile(document); + stream = await getFromStorage(document.chemin_fichier); } response.raw.writeHead(200, { "Content-Type": "application/octet-stream", - "Content-Disposition": `attachment; filename="${mailingList.document.nom_fichier}"`, + "Content-Disposition": `attachment; filename="${document.nom_fichier}"`, }); await oleoduc( // @ts-ignore stream, crypto.isCipherAvailable() - ? crypto.decipher(mailingList.document.hash_secret) + ? crypto.decipher(document.hash_secret) : noop(), response.raw ); diff --git a/shared/routes/mailingList.routes.ts b/shared/routes/mailingList.routes.ts index 1425311cc..08d3d936a 100644 --- a/shared/routes/mailingList.routes.ts +++ b/shared/routes/mailingList.routes.ts @@ -1,6 +1,5 @@ import { FromSchema } from "json-schema-to-ts"; -import { deserialize } from ".."; import { DOCUMENT_TYPES } from "./upload.routes"; export const SReqGetMailingList = { @@ -12,45 +11,3 @@ export const SReqGetMailingList = { } as const; export type IReqGetMailingList = FromSchema; - -export const SResGetMailingList = { - type: "object", - properties: { - _id: { type: "string", format: "ObjectId" }, - source: { - type: "string", - }, - document_id: { - type: "string", - description: "Fichier liste de diffusion", - }, - status: { - type: "string", - }, - user_id: { - type: "string", - }, - updated_at: { - type: "string", - format: "date-time", - description: "Date de mise à jour en base de données", - }, - created_at: { - type: "string", - format: "date-time", - description: "Date d'ajout en base de données", - }, - }, - required: ["_id", "source", "status", "user_id"], - additionalProperties: false, -} as const; - -export type IResGetMailingList = FromSchema< - typeof SReqGetMailingList, - { deserialize: deserialize } ->; - -export const SResGetMailingLists = { - type: "array", - items: SResGetMailingList, -} as const; diff --git a/ui/app/liste-diffusion/page.tsx b/ui/app/liste-diffusion/page.tsx index 58babcb31..3de80a768 100644 --- a/ui/app/liste-diffusion/page.tsx +++ b/ui/app/liste-diffusion/page.tsx @@ -12,10 +12,7 @@ import { } from "@chakra-ui/react"; import { useQuery } from "@tanstack/react-query"; import { useForm } from "react-hook-form"; -import { - IMailingList, - MAILING_LIST_STATUS, -} from "shared/models/mailingList.model"; +import { IJob, JOB_STATUS_LIST } from "shared/models/job.model"; import { IReqGetMailingList } from "shared/routes/mailingList.routes"; import { DOCUMENT_TYPES } from "shared/routes/upload.routes"; @@ -29,7 +26,7 @@ import Breadcrumb, { PAGES } from "../components/breadcrumb/Breadcrumb"; const ListeDiffusionPage = () => { const toast = useToast(); - const { data: mailingLists, refetch } = useQuery({ + const { data: mailingLists, refetch } = useQuery({ queryKey: ["mailingLists"], queryFn: async () => { const { data } = await api.get("/mailing-lists"); @@ -119,16 +116,31 @@ const ListeDiffusionPage = () => { id: "source", size: 100, header: () => "Source", + cell: ({ row }) => row.original.payload?.source, }, status: { id: "status", size: 100, header: () => "Statut", cell: ({ row }) => { - return { - pending: "En cours de génération", - finished: "Terminé", - }[row.original.status]; + return ( + <> + { + { + pending: "En attente", + will_start: "Programmé", + running: "En cours", + finished: "Terminé", + blocked: "Bloqué", + errored: "Erreur", + }[row.original.status] + } + {row.original.status === JOB_STATUS_LIST.RUNNING && + row.original.payload?.processed && ( + <> ({row.original.payload?.processed} %) + )} + + ); }, }, date: { @@ -152,10 +164,7 @@ const ListeDiffusionPage = () => { size: 25, header: () => "Actions", cell: ({ row }) => { - if ( - row.original.status !== MAILING_LIST_STATUS.FINISHED || - !row.original.document_id - ) { + if (row.original.status !== JOB_STATUS_LIST.FINISHED) { return null; }