diff --git a/.talismanrc b/.talismanrc index 23555ddb1..fa5947a76 100644 --- a/.talismanrc +++ b/.talismanrc @@ -8,7 +8,7 @@ fileignoreconfig: - filename: .bin/scripts/setup-local-env.sh checksum: c688cb656af49c2f824ae7a0c9664bf7bdecdcf29d93ec4a99976fb08dcd9285 - filename: .infra/files/configs/mongodb/seed.gpg - checksum: 086c2803a427b3f3c48906bcfd04cd36fe61b3a757c349aa345371e47bdfb505 + checksum: b28ed98d8847151cf88eb36d3f7f45804138789ada46b2c00964f35b86f5c609 - filename: .infra/vault/vault.yml checksum: 796635402459cd6f6edb18a01b54bb2ecbb75067335ec7b3a6fd291dfd25e806 - filename: server/src/common/actions/formations.actions.ts diff --git a/server/src/common/actions/engine/engine.actions.ts b/server/src/common/actions/engine/engine.actions.ts index def19868b..a64cc01e1 100644 --- a/server/src/common/actions/engine/engine.actions.ts +++ b/server/src/common/actions/engine/engine.actions.ts @@ -1,6 +1,6 @@ import { isEqual } from "date-fns"; -import { cloneDeep, get } from "lodash-es"; -import { Collection, WithoutId } from "mongodb"; +import { cloneDeep } from "lodash-es"; +import { Collection } from "mongodb"; import { IEffectif } from "shared/models/data/effectifs.model"; import { IEffectifDECA } from "shared/models/data/effectifsDECA.model"; import { IEffectifQueue } from "shared/models/data/effectifsQueue.model"; @@ -91,19 +91,37 @@ export const completeEffectifAddress = async , db: Collection) => { - const queryKeys = ["id_erp_apprenant", "organisme_id", "formation.cfd", "annee_scolaire"]; - // Recherche de l'effectif via sa clé d'unicité - const query = queryKeys.reduce((acc, item) => ({ ...acc, [item]: get(effectif, item) }), {}); +export const checkIfEffectifExists = async ( + effectif: Pick, + db: Collection +): Promise => { + const effectifs = await db + .find({ + id_erp_apprenant: effectif.id_erp_apprenant, + organisme_id: effectif.organisme_id, + annee_scolaire: effectif.annee_scolaire, + }) + .toArray(); + + const newCfd = effectif.formation?.cfd ?? null; + const newRncp = effectif.formation?.rncp ?? null; + + const macthingEffectifs = effectifs.filter((eff) => { + const currentCfd = eff.formation?.cfd ?? null; + const currentRncp = eff.formation?.rncp ?? null; + + // Si le CFD est null, on considère qu'on met à jour l'effectif + const isSameCfd = currentCfd === newCfd || currentCfd === null || newCfd === null; + const isSameRncp = currentRncp === newRncp || currentRncp === null || newRncp === null; + + return isSameCfd && isSameRncp; + }); + + if (macthingEffectifs.length === 0) { + return null; + } - return await db.findOne(query); + return macthingEffectifs[0]; }; /** diff --git a/server/src/db/migrations/20241211154816-deduplication-effectifs-auto.ts b/server/src/db/migrations/20241211154816-deduplication-effectifs-auto.ts new file mode 100644 index 000000000..31a7f640c --- /dev/null +++ b/server/src/db/migrations/20241211154816-deduplication-effectifs-auto.ts @@ -0,0 +1,133 @@ +import { addJob } from "job-processor"; +import type { ObjectId } from "mongodb"; +import { MOTIF_SUPPRESSION } from "shared/constants"; + +import { softDeleteEffectif } from "@/common/actions/effectifs.actions"; +import { effectifsDb, effectifsDECADb } from "@/common/model/collections"; +import { recreateIndexes } from "@/jobs/db/recreateIndexes"; + +export const up = async () => { + const effectifDuplicats = await effectifsDb() + .aggregate<{ + _id: { + annee_scolaire: string; + cfd: string; + rncp: string; + id_erp_apprenant: string; + organisme_id: string; + }; + count: number; + docs: { + id: ObjectId; + updated_at: Date; + }[]; + }>([ + { + $group: { + _id: { + annee_scolaire: "$annee_scolaire", + cfd: "$formation.cfd", + rncp: "$formation.rncp", + id_erp_apprenant: "$id_erp_apprenant", + organisme_id: "$organisme_id", + }, + count: { + $sum: 1, + }, + docs: { + $addToSet: { + id: "$_id", + updated_at: "$updated_at", + }, + }, + }, + }, + { + $match: { count: { $gt: 1 } }, + }, + ]) + .toArray(); + + for (const effectifDuplicat of effectifDuplicats) { + const { docs } = effectifDuplicat; + const lastUpdatedDoc = docs.reduce((acc, doc) => { + if (doc.updated_at.getTime() > acc.updated_at.getTime()) { + return doc; + } + + return acc; + }, docs[0]); + + await softDeleteEffectif(lastUpdatedDoc.id, null, { + motif: MOTIF_SUPPRESSION.Doublon, + description: "Suppression du doublon suite à la migration index unique sur effectifs", + }); + } + + const effectifDecaDuplicats = await effectifsDECADb() + .aggregate<{ + _id: { + annee_scolaire: string; + cfd: string; + rncp: string; + id_erp_apprenant: string; + organisme_id: string; + }; + count: number; + docs: { + id: ObjectId; + updated_at: Date; + }[]; + }>([ + { + $group: { + _id: { + annee_scolaire: "$annee_scolaire", + cfd: "$formation.cfd", + rncp: "$formation.rncp", + id_erp_apprenant: "$id_erp_apprenant", + organisme_id: "$organisme_id", + }, + count: { + $sum: 1, + }, + docs: { + $addToSet: { + id: "$_id", + updated_at: "$updated_at", + }, + }, + }, + }, + { + $match: { count: { $gt: 1 } }, + }, + ]) + .toArray(); + + for (const effectifDuplicat of effectifDecaDuplicats) { + const { docs } = effectifDuplicat; + const lastUpdatedDoc = docs.reduce((acc, doc) => { + if (doc.updated_at.getTime() > acc.updated_at.getTime()) { + return doc; + } + + return acc; + }, docs[0]); + + await effectifsDECADb().deleteMany({ _id: { $ne: lastUpdatedDoc.id } }); + } + + // Recreate indexes before dropping unique index as it will be used to create new indexes + await recreateIndexes({ drop: false }); + + // DROP unique index + await effectifsDb().dropIndex( + "organisme_id_1_annee_scolaire_1_id_erp_apprenant_1_apprenant.nom_1_apprenant.prenom_1_formation.cfd_1_formation.annee_1" + ); + + await addJob({ + name: "tmp:migration:duplicat-formation", + queued: true, + }); +}; diff --git a/server/src/jobs/hydrate/deca/hydrate-deca-raw.ts b/server/src/jobs/hydrate/deca/hydrate-deca-raw.ts index 378ea9233..185fecc7d 100644 --- a/server/src/jobs/hydrate/deca/hydrate-deca-raw.ts +++ b/server/src/jobs/hydrate/deca/hydrate-deca-raw.ts @@ -1,7 +1,7 @@ import { normalize } from "path"; import { captureException } from "@sentry/node"; -import { MongoClient, ObjectId, WithoutId } from "mongodb"; +import { MongoClient, MongoError, ObjectId, WithoutId } from "mongodb"; import { SOURCE_APPRENANT } from "shared/constants"; import { IOrganisme } from "shared/models"; import { IRawBalDeca } from "shared/models/data/airbyteRawBalDeca.model"; @@ -96,20 +96,35 @@ export async function hydrateDecaRaw() { } } -async function updateEffectifDeca(document: IRawBalDeca, count: { created: number; updated: number }) { - const newDocuments: WithoutId[] = await transformDocument(document); - const organismesId = new Set(); - for (const newDocument of newDocuments) { - const effectifFound = await checkIfEffectifExists(newDocument, effectifsDECADb()); +async function upsertEffectifDeca( + effectif: WithoutId, + count: { created: number; updated: number }, + retry: boolean = true +) { + const effectifFound = await checkIfEffectifExists(effectif, effectifsDECADb()); - if (!effectifFound) { - await effectifsDECADb().insertOne(newDocument as IEffectifDECA); + if (!effectifFound) { + try { + await effectifsDECADb().insertOne({ ...effectif, _id: new ObjectId() }); count.created++; - } else { - await effectifsDECADb().updateOne({ _id: effectifFound._id }, { $set: newDocument }); - count.updated++; + } catch (err) { + // Le code d'erreur 11000 correspond à une duplication d'index unique + // Ce cas arrive lors du traitement concurrentiel du meme effectif dans la queue + if (retry && err instanceof MongoError && err.code === 11000) { + return upsertEffectifDeca(effectif, count, false); + } } + } else { + await effectifsDECADb().updateOne({ _id: effectifFound._id }, { $set: effectif }); + count.updated++; + } +} +async function updateEffectifDeca(document: IRawBalDeca, count: { created: number; updated: number }) { + const newDocuments: WithoutId[] = await transformDocument(document); + const organismesId = new Set(); + for (const newDocument of newDocuments) { + await upsertEffectifDeca(newDocument, count); organismesId.add(newDocument.organisme_id.toString()); } diff --git a/server/src/jobs/ingestion/process-ingestion.ts b/server/src/jobs/ingestion/process-ingestion.ts index a7a62059a..a4ea18ae0 100644 --- a/server/src/jobs/ingestion/process-ingestion.ts +++ b/server/src/jobs/ingestion/process-ingestion.ts @@ -490,7 +490,7 @@ const createOrUpdateEffectif = async ( }, }; const itemProcessingInfos: ItemProcessingInfos = {}; - let effectifDb = await checkIfEffectifExists(effectifWithComputedFields, effectifsDb()); + let effectifDb = await checkIfEffectifExists(effectifWithComputedFields, effectifsDb()); itemProcessingInfos.effectif_new = !effectifDb; try { diff --git a/server/src/jobs/jobs.ts b/server/src/jobs/jobs.ts index 906fa956f..ab857f949 100644 --- a/server/src/jobs/jobs.ts +++ b/server/src/jobs/jobs.ts @@ -18,10 +18,6 @@ import { findInvalidDocuments } from "./db/findInvalidDocuments"; import { recreateIndexes } from "./db/recreateIndexes"; import { validateModels } from "./db/schemaValidation"; import { sendReminderEmails } from "./emails/reminder"; -import { - fiabilisationEffectifFormation, - getEffectifCertification, -} from "./fiabilisation/certification/fiabilisation-certification"; import { transformSansContratsToAbandonsDepuis, transformRupturantsToAbandonsDepuis } from "./fiabilisation/effectifs"; import { hydrateRaisonSocialeEtEnseigneOFAInconnus } from "./fiabilisation/ofa-inconnus"; import { updateOrganismesFiabilisationStatut } from "./fiabilisation/uai-siret/updateFiabilisation"; @@ -338,68 +334,96 @@ export async function setupJobProcessor() { return createMigration(job.payload as any); }, }, - "tmp:migration:formation-certification": { + "tmp:migration:duplicat-formation": { handler: async (job, signal) => { // In case of interruption, we can restart the job from the last processed effectif // Any updated effectif has either been updated by the job or has been updated by the processing queue - const processEffectif = async (effectif: IEffectif) => { - const certification = await getEffectifCertification(effectif); - - const update = { - formation: fiabilisationEffectifFormation(effectif, certification), - "_raw.formation": effectif.formation, - _computed: { - ...effectif._computed, - formation: { - ...effectif._computed?.formation, - codes_rome: certification?.domaines.rome.rncp?.map(({ code }) => code) ?? null, + const cursor = effectifsDb().aggregate<{ + _id: { + annee_scolaire: IEffectif["annee_scolaire"]; + id_erp_apprenant: IEffectif["id_erp_apprenant"]; + organisme_id: IEffectif["organisme_id"]; + }; + count: number; + effectifs: IEffectif[]; + }>([ + { + $sort: { + organisme_id: 1, + id_erp_apprenant: 1, + annee_scolaire: 1, + }, + }, + { + $group: { + _id: { + annee_scolaire: "$annee_scolaire", + id_erp_apprenant: "$id_erp_apprenant", + organisme_id: "$organisme_id", }, + count: { $sum: 1 }, + effectifs: { $addToSet: "$$ROOT" }, }, - }; + }, + { $match: { count: { $gt: 1 } } }, + ]); + + for await (const doc of cursor) { + const validEffectifs: IEffectif[] = []; + const duplicatedEffectifs: IEffectif[] = []; + + // Sort effectifs in reverse updated_at order in order to keep the most recent effectif + const effectifs = doc.effectifs.toSorted((a, b) => { + if (a.updated_at == null) return 1; + if (b.updated_at == null) return -1; + return a.updated_at.getTime() > b.updated_at.getTime() ? -1 : 1; + }); + + for (const effectif of effectifs) { + if (validEffectifs.length === 0) { + validEffectifs.push(effectif); + continue; + } - await effectifsDb() - .updateOne({ _id: effectif._id }, { $set: update }) - .catch(async (err) => { - // If the document is a duplicated effectif, we can safely remove the older document - if (err instanceof MongoError && err.code === 11000) { - await softDeleteEffectif(effectif._id, null, { - motif: MOTIF_SUPPRESSION.Doublon, - description: "Suppression du doublon suite à la migration des formations", - }); - return; - } + const currentCfd = effectif.formation?.cfd ?? null; + const currentRncp = effectif.formation?.rncp ?? null; - throw err; - }); - }; + if (!currentCfd && !currentRncp) { + duplicatedEffectifs.push(effectif); + continue; + } - const cursorEffectif = effectifsDb().find( - { created_at: { $lte: job.created_at } }, - { sort: { created_at: -1 } } - ); - let bulkEffectifs: IEffectif[] = []; - for await (const effectif of cursorEffectif) { - if (effectif._raw?.formation) { - // Already migrated - continue; - } + const isDuplicated = validEffectifs.some((validEffectif) => { + const validCfd = validEffectif.formation?.cfd ?? null; + const validRncp = validEffectif.formation?.rncp ?? null; + + const isSameCfd = validCfd === null || currentCfd === null || validCfd === currentCfd; + const isSameRncp = validRncp === null || currentRncp === null || validRncp === currentRncp; - bulkEffectifs.push(effectif); + return isSameCfd && isSameRncp; + }); - if (bulkEffectifs.length > 100) { - await Promise.all(bulkEffectifs.map(processEffectif)); - if (signal.aborted) { - return; + if (isDuplicated) { + duplicatedEffectifs.push(effectif); + } else { + validEffectifs.push(effectif); } - bulkEffectifs = []; } - } - if (bulkEffectifs.length > 0) { - await Promise.all(bulkEffectifs.map(processEffectif)); - } - // TODO: Formation v2 migration + await Promise.all( + duplicatedEffectifs.map(async (effectif) => + softDeleteEffectif(effectif._id, null, { + motif: MOTIF_SUPPRESSION.Doublon, + description: "Doublon de formation", + }) + ) + ); + + if (signal.aborted) { + return; + } + } }, resumable: true, }, diff --git a/shared/models/data/effectifs.model.ts b/shared/models/data/effectifs.model.ts index a90c4f318..a7ff292de 100644 --- a/shared/models/data/effectifs.model.ts +++ b/shared/models/data/effectifs.model.ts @@ -26,10 +26,8 @@ const indexes: [IndexSpecification, CreateIndexesOptions][] = [ organisme_id: 1, annee_scolaire: 1, id_erp_apprenant: 1, - "apprenant.nom": 1, - "apprenant.prenom": 1, "formation.cfd": 1, - "formation.annee": 1, + "formation.rncp": 1, }, { unique: true }, ], diff --git a/shared/models/data/effectifsDECA.model.ts b/shared/models/data/effectifsDECA.model.ts index 192d85559..0897257b2 100644 --- a/shared/models/data/effectifsDECA.model.ts +++ b/shared/models/data/effectifsDECA.model.ts @@ -22,12 +22,15 @@ const collectionName = "effectifsDECA"; const indexes: [IndexSpecification, CreateIndexesOptions][] = [ [ { + organisme_id: 1, annee_scolaire: 1, - "formation.cfd": 1, id_erp_apprenant: 1, - organisme_id: 1, + "formation.cfd": 1, + "formation.rncp": 1, + }, + { + unique: true, }, - {}, ], [ {