Skip to content

Commit

Permalink
feat: mailing list progress
Browse files Browse the repository at this point in the history
  • Loading branch information
david-nathanael committed Jun 29, 2023
1 parent 8d7bdfc commit 2eb2da8
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 246 deletions.
4 changes: 2 additions & 2 deletions server/src/modules/actions/documents.actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export const createEmptyDocument = async (options: IUploadDocumentOptions) => {
throw new Error("Missing filename");
}

const doucument = await createDocument({
const document = await createDocument({
_id: documentId,
type_document: options.type_document,
ext_fichier: options.filename.split(".").pop(),
Expand All @@ -110,7 +110,7 @@ export const createEmptyDocument = async (options: IUploadDocumentOptions) => {
updated_at: new Date(),
created_at: new Date(),
});
return doucument as IDocument;
return document as IDocument;
};

export const uploadFile = async (
Expand Down
13 changes: 6 additions & 7 deletions server/src/modules/actions/job.actions.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { Filter, FindOptions } from "mongodb";
import { Filter, FindOptions, ObjectId } from "mongodb";
import { IJob, JOB_STATUS_LIST } from "shared/models/job.model";

import { getDbCollection } from "@/common/utils/mongodbUtils";

/**
* Création d'un job
* @param {*} data
* @returns
*/
export const createJob = async ({
name,
Expand All @@ -33,13 +31,14 @@ export const findJob = async (
return await getDbCollection("jobs").findOne<IJob>(filter, options);
};

export const findJobs = async (filter: Filter<IJob>) => {
return await getDbCollection("jobs").find<IJob>(filter).toArray();
};

/**
* Mise à jour d'un job
* @param {*} _id
* @param {Object} data
* @returns
*/
export const updateJob = async (_id, data) => {
export const updateJob = async (_id: ObjectId, data: Partial<IJob>) => {
return getDbCollection("jobs").updateOne(
{ _id },
{ $set: { ...data, updated_at: new Date() } }
Expand Down
251 changes: 107 additions & 144 deletions server/src/modules/actions/mailingLists.actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { Transform } from "node:stream";
import { pipeline } from "node:stream/promises";

import { stringify } from "csv-stringify";
import { Filter, ObjectId, UpdateFilter } from "mongodb";
import { IMailingList } from "shared/models/mailingList.model";
import { Filter, ObjectId } from "mongodb";
import { IJob } from "shared/models/job.model";
import { DOCUMENT_TYPES } from "shared/routes/upload.routes";

import logger from "@/common/logger";
Expand All @@ -16,108 +16,65 @@ import {
TrainingLink,
} from "../../common/apis/lba";
import { uploadToStorage } from "../../common/utils/ovhUtils";
import { addJob } from "../jobs/jobs";
import { noop } from "../server/utils/upload.utils";
import {
createEmptyDocument,
deleteDocumentById,
findDocument,
importDocumentContent,
} from "./documents.actions";

const DEFAULT_LOOKUP = {
from: "documents",
let: { documentId: { $toObjectId: "$document_id" } },
pipeline: [
{
$match: {
$expr: { $eq: ["$_id", "$$documentId"] },
},
},
],
as: "document",
};

const DEFAULT_UNWIND = {
path: "$document",
preserveNullAndEmptyArrays: true,
};
import { findJob, findJobs, updateJob } from "./job.actions";

/**
* CRUD
*/

interface ICreateMailingList extends Omit<IMailingList, "_id"> {}
export interface IMailingList {
user_id: string;
source: string;
document_id?: string;
}

export const createMailingList = async (data: ICreateMailingList) => {
const { insertedId: _id } = await getDbCollection("mailingLists").insertOne(
data
);
export const createMailingList = async (data: IMailingList) => {
const outputDocument = await createEmptyDocument({
type_document: `mailing-list-${data.source}`,
filename: `mailing-list-${data.source}-${new ObjectId()}.csv`,
});

return findMailingList({ _id });
return addJob({
name: "generate:mailing-list",
payload: {
user_id: data.user_id,
source: data.source,
document_id: outputDocument._id.toString(),
},
});
};

export const findMailingList = async (filter: Filter<IMailingList>) => {
return getDbCollection("mailingLists")
.aggregate<IMailingList>([
{
$match: filter,
},
{
$lookup: DEFAULT_LOOKUP,
},
{
$unwind: DEFAULT_UNWIND,
},
])
.next();
export const findMailingList = async (filter: Filter<IJob>) => {
return findJob({
name: "generate:mailing-list",
...filter,
});
};

export const findMailingLists = async (filter: Filter<IMailingList>) => {
const users = await getDbCollection("mailingLists")
.aggregate<IMailingList>([
{
$match: filter,
},
{
$lookup: DEFAULT_LOOKUP,
},
{
$unwind: DEFAULT_UNWIND,
},
])
.toArray();

return users;
export const findMailingLists = async (filter: Filter<IJob>) => {
return findJobs(filter);
};

export const updateMailingList = async (
mailingList: IMailingList,
data: Partial<IMailingList>,
updateFilter: UpdateFilter<IMailingList> = {}
_id: ObjectId,
data: Partial<IMailingList>
) => {
return await getDbCollection("mailingLists").findOneAndUpdate(
{
_id: mailingList._id,
},
{
$set: data,
...updateFilter,
}
);
return updateJob(_id, data);
};

/**
* ACTIONS
*/

export const processMailingList = async ({ mailing_list_id }) => {
const mailingList = await findMailingList({
_id: mailing_list_id,
});
if (!mailingList) {
throw new Error("Processor > /mailing-list: Can't find mailing list");
}

export const processMailingList = async (mailingList: IMailingList) => {
switch (mailingList.source) {
case DOCUMENT_TYPES.VOEUX_PARCOURSUP_MAI_2023:
case DOCUMENT_TYPES.VOEUX_AFFELNET_MAI_2023:
Expand All @@ -132,6 +89,12 @@ export const processMailingList = async ({ mailing_list_id }) => {
};

const handleVoeuxParcoursupMai2023 = async (mailingList: IMailingList) => {
const job = await findJob({
"payload.document_id": mailingList.document_id,
});

if (!job) throw new Error("Job not found");

const document = await findDocument({
type_document: mailingList.source,
});
Expand All @@ -141,77 +104,73 @@ const handleVoeuxParcoursupMai2023 = async (mailingList: IMailingList) => {
const batchSize = LIMIT_TRAINING_LINKS_PER_REQUEST;
let skip = 0;
let hasMore = true;
let processed = 0;

const outputDocument = await createEmptyDocument({
type_document: `mailing-list-${DOCUMENT_TYPES.VOEUX_PARCOURSUP_MAI_2023}`,
filename: `mailing-list-${mailingList._id.toString()}-${
mailingList.source
}.csv`,
const outputDocument = await findDocument({
_id: new ObjectId(mailingList.document_id),
});

if (!outputDocument) throw new Error("Output document not found");

while (hasMore) {
try {
const wishes = await getDbCollection("documentContents")
.find({ document_id: document._id.toString() })
.limit(batchSize)
.skip(skip)
.toArray();

if (!wishes.length) {
hasMore = false;
continue;
}

// TODO: vérifier le nom des colonnes
const data = wishes?.map((dc) => ({
id: dc._id.toString(),
cle_ministere_educatif: dc.content?.cle_ministere_educatif ?? "",
mef: dc.content?.code_mef ?? "",
code_postal: dc.content?.code_postal ?? "",
uai: dc.content?.code_uai_etab_accueil ?? "",
cfd: dc.content?.cfd ?? "", // pas présent dans le fichier
rncp: dc.content?.rncp ?? "", // pas présent dans le fichier
email:
dc.content?.mail_responsable_1 ??
dc.content?.mail_responsable_2 ??
"",
nom_eleve: dc.content?.nom_eleve ?? "",
prenom_eleve: [
dc.content?.prenom_1 ?? "",
dc.content?.prenom_2 ?? "",
dc.content?.prenom_3 ?? "",
]
.join(" ")
.trim(),
libelle_etab_accueil: dc.content?.libelle_etab_accueil ?? "",
libelle_formation: dc.content?.libelle_formation ?? "",
}));

const tmp = (await getTrainingLinks(data)) as TrainingLink[];
const tmpContent = tmp.flat();

await importDocumentContent(outputDocument, tmpContent, (line) => line);

// Check if there are more documents to retrieve
if (wishes.length === batchSize) {
skip += batchSize;
} else {
hasMore = false;
logger.info("All documents retrieved");
}
} catch (err) {
await updateMailingList(mailingList, {
status: "error",
});
console.error("Error retrieving documents:", err);
return;
const wishes = await getDbCollection("documentContents")
.find({ document_id: document._id.toString() })
.limit(batchSize)
.skip(skip)
.toArray();

if (!wishes.length) {
hasMore = false;
continue;
}
}

await updateMailingList(mailingList, {
document_id: outputDocument._id.toString(),
status: "finished",
});
// TODO: vérifier le nom des colonnes
const data = wishes?.map((dc) => ({
id: dc._id.toString(),
cle_ministere_educatif: dc.content?.cle_ministere_educatif ?? "",
mef: dc.content?.code_mef ?? "",
code_postal: dc.content?.code_postal ?? "",
uai: dc.content?.code_uai_etab_accueil ?? "",
cfd: dc.content?.cfd ?? "", // pas présent dans le fichier
rncp: dc.content?.rncp ?? "", // pas présent dans le fichier
email:
dc.content?.mail_responsable_1 ?? dc.content?.mail_responsable_2 ?? "",
nom_eleve: dc.content?.nom_eleve ?? "",
prenom_eleve: [
dc.content?.prenom_1 ?? "",
dc.content?.prenom_2 ?? "",
dc.content?.prenom_3 ?? "",
]
.join(" ")
.trim(),
libelle_etab_accueil: dc.content?.libelle_etab_accueil ?? "",
libelle_formation: dc.content?.libelle_formation ?? "",
}));

const tmp = (await getTrainingLinks(data)) as TrainingLink[];
const tmpContent = tmp.flat();

await importDocumentContent(outputDocument, tmpContent, (line) => line);

processed += wishes.length;

await updateJob(job._id, {
payload: {
...job.payload,
processed: document.lines_count
? (processed / document.lines_count) * 100 //
: 0,
processed_count: processed,
},
});
// Check if there are more documents to retrieve
if (wishes.length === batchSize) {
skip += batchSize;
} else {
hasMore = false;
logger.info("All documents retrieved");
}
}
};

async function* getLine(cursor) {
Expand Down Expand Up @@ -309,7 +268,11 @@ export const createMailingListFile = async (document: any) => {
// });
};

export const deleteMailingList = async (mailingList: IMailingList) => {
await deleteDocumentById(new ObjectId(mailingList.document_id));
await getDbCollection("mailingLists").deleteOne({ _id: mailingList._id });
export const deleteMailingList = async (mailingList: IJob) => {
if (mailingList.payload?.document_id) {
await deleteDocumentById(
new ObjectId(mailingList.payload.document_id as string)
);
}
await getDbCollection("jobs").deleteOne({ _id: mailingList._id });
};
6 changes: 4 additions & 2 deletions server/src/modules/jobs/executeJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { formatDuration, intervalToDuration } from "date-fns";
import { IJob, JOB_STATUS_LIST } from "shared/models/job.model";

import logger from "@/common/logger";
import { updateJob } from "@/modules/actions/job.actions";
import { findJob, updateJob } from "@/modules/actions/job.actions";

import { closeMongodbConnection } from "../../common/utils/mongodbUtils";

Expand Down Expand Up @@ -40,9 +40,11 @@ export const executeJob = async (
const duration = formatDuration(
intervalToDuration({ start: startDate, end: endDate })
);
// récupérer la version la plus récente pour conserver les éventuelles modifications effectuées durant l'éxecution du job
const finishedJob = (await findJob({ _id: job._id })) as IJob;
await updateJob(job._id, {
status: error ? JOB_STATUS_LIST.ERRORED : JOB_STATUS_LIST.FINISHED,
payload: { duration, result, error },
payload: { ...finishedJob.payload, duration, result, error },
ended_at: endDate,
});
if (options.runningLogs) logger.info(`Job: ${job.name} Ended`);
Expand Down
Loading

0 comments on commit 2eb2da8

Please sign in to comment.