Skip to content

Commit

Permalink
feat: consolidation des effectifs autour du RNCP/CFD (#3935)
Browse files Browse the repository at this point in the history
  • Loading branch information
moroine authored Dec 18, 2024
1 parent 9ded7ab commit 2aad81e
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fileignoreconfig:
- filename: .bin/scripts/setup-local-env.sh
checksum: c688cb656af49c2f824ae7a0c9664bf7bdecdcf29d93ec4a99976fb08dcd9285
- filename: .infra/files/configs/mongodb/seed.gpg
checksum: 086c2803a427b3f3c48906bcfd04cd36fe61b3a757c349aa345371e47bdfb505
checksum: b28ed98d8847151cf88eb36d3f7f45804138789ada46b2c00964f35b86f5c609
- filename: .infra/vault/vault.yml
checksum: 796635402459cd6f6edb18a01b54bb2ecbb75067335ec7b3a6fd291dfd25e806
- filename: server/src/common/actions/formations.actions.ts
Expand Down
46 changes: 32 additions & 14 deletions server/src/common/actions/engine/engine.actions.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { isEqual } from "date-fns";
import { cloneDeep, get } from "lodash-es";
import { Collection, WithoutId } from "mongodb";
import { cloneDeep } from "lodash-es";
import { Collection } from "mongodb";
import { IEffectif } from "shared/models/data/effectifs.model";
import { IEffectifDECA } from "shared/models/data/effectifsDECA.model";
import { IEffectifQueue } from "shared/models/data/effectifsQueue.model";
Expand Down Expand Up @@ -91,19 +91,37 @@ export const completeEffectifAddress = async <T extends { apprenant: Partial<IEf
return effectifDataWithAddress;
};

/**
* Fonction de vérification de la présence d'un effectif via la clé d'unicité
* id_erp_apprenant : identifiant unique du jeune dans le CFA
* organisme_id : identifiant de l'organisme de formation en apprentissage
* formation.cfd : Code formation diplôme de la formation suivie par le jeune
* annee_scolaire : Année scolaire dans laquelle se trouve le jeune pour cette formation dans cet établissement
*/
export const checkIfEffectifExists = async (effectif: IEffectif | WithoutId<IEffectifDECA>, db: Collection<any>) => {
const queryKeys = ["id_erp_apprenant", "organisme_id", "formation.cfd", "annee_scolaire"];
// Recherche de l'effectif via sa clé d'unicité
const query = queryKeys.reduce((acc, item) => ({ ...acc, [item]: get(effectif, item) }), {});
export const checkIfEffectifExists = async <E extends IEffectif | IEffectifDECA>(
effectif: Pick<E, "id_erp_apprenant" | "organisme_id" | "annee_scolaire" | "formation">,
db: Collection<any>
): Promise<E | null> => {
const effectifs = await db
.find({
id_erp_apprenant: effectif.id_erp_apprenant,
organisme_id: effectif.organisme_id,
annee_scolaire: effectif.annee_scolaire,
})
.toArray();

const newCfd = effectif.formation?.cfd ?? null;
const newRncp = effectif.formation?.rncp ?? null;

const macthingEffectifs = effectifs.filter((eff) => {
const currentCfd = eff.formation?.cfd ?? null;
const currentRncp = eff.formation?.rncp ?? null;

// Si le CFD est null, on considère qu'on met à jour l'effectif
const isSameCfd = currentCfd === newCfd || currentCfd === null || newCfd === null;
const isSameRncp = currentRncp === newRncp || currentRncp === null || newRncp === null;

return isSameCfd && isSameRncp;
});

if (macthingEffectifs.length === 0) {
return null;
}

return await db.findOne(query);
return macthingEffectifs[0];
};

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { addJob } from "job-processor";
import type { ObjectId } from "mongodb";
import { MOTIF_SUPPRESSION } from "shared/constants";

import { softDeleteEffectif } from "@/common/actions/effectifs.actions";
import { effectifsDb, effectifsDECADb } from "@/common/model/collections";
import { recreateIndexes } from "@/jobs/db/recreateIndexes";

export const up = async () => {
const effectifDuplicats = await effectifsDb()
.aggregate<{
_id: {
annee_scolaire: string;
cfd: string;
rncp: string;
id_erp_apprenant: string;
organisme_id: string;
};
count: number;
docs: {
id: ObjectId;
updated_at: Date;
}[];
}>([
{
$group: {
_id: {
annee_scolaire: "$annee_scolaire",
cfd: "$formation.cfd",
rncp: "$formation.rncp",
id_erp_apprenant: "$id_erp_apprenant",
organisme_id: "$organisme_id",
},
count: {
$sum: 1,
},
docs: {
$addToSet: {
id: "$_id",
updated_at: "$updated_at",
},
},
},
},
{
$match: { count: { $gt: 1 } },
},
])
.toArray();

for (const effectifDuplicat of effectifDuplicats) {
const { docs } = effectifDuplicat;
const lastUpdatedDoc = docs.reduce((acc, doc) => {
if (doc.updated_at.getTime() > acc.updated_at.getTime()) {
return doc;
}

return acc;
}, docs[0]);

await softDeleteEffectif(lastUpdatedDoc.id, null, {
motif: MOTIF_SUPPRESSION.Doublon,
description: "Suppression du doublon suite à la migration index unique sur effectifs",
});
}

const effectifDecaDuplicats = await effectifsDECADb()
.aggregate<{
_id: {
annee_scolaire: string;
cfd: string;
rncp: string;
id_erp_apprenant: string;
organisme_id: string;
};
count: number;
docs: {
id: ObjectId;
updated_at: Date;
}[];
}>([
{
$group: {
_id: {
annee_scolaire: "$annee_scolaire",
cfd: "$formation.cfd",
rncp: "$formation.rncp",
id_erp_apprenant: "$id_erp_apprenant",
organisme_id: "$organisme_id",
},
count: {
$sum: 1,
},
docs: {
$addToSet: {
id: "$_id",
updated_at: "$updated_at",
},
},
},
},
{
$match: { count: { $gt: 1 } },
},
])
.toArray();

for (const effectifDuplicat of effectifDecaDuplicats) {
const { docs } = effectifDuplicat;
const lastUpdatedDoc = docs.reduce((acc, doc) => {
if (doc.updated_at.getTime() > acc.updated_at.getTime()) {
return doc;
}

return acc;
}, docs[0]);

await effectifsDECADb().deleteMany({ _id: { $ne: lastUpdatedDoc.id } });
}

// Recreate indexes before dropping unique index as it will be used to create new indexes
await recreateIndexes({ drop: false });

// DROP unique index
await effectifsDb().dropIndex(
"organisme_id_1_annee_scolaire_1_id_erp_apprenant_1_apprenant.nom_1_apprenant.prenom_1_formation.cfd_1_formation.annee_1"
);

await addJob({
name: "tmp:migration:duplicat-formation",
queued: true,
});
};
37 changes: 26 additions & 11 deletions server/src/jobs/hydrate/deca/hydrate-deca-raw.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { normalize } from "path";

import { captureException } from "@sentry/node";
import { MongoClient, ObjectId, WithoutId } from "mongodb";
import { MongoClient, MongoError, ObjectId, WithoutId } from "mongodb";
import { SOURCE_APPRENANT } from "shared/constants";
import { IOrganisme } from "shared/models";
import { IRawBalDeca } from "shared/models/data/airbyteRawBalDeca.model";
Expand Down Expand Up @@ -96,20 +96,35 @@ export async function hydrateDecaRaw() {
}
}

async function updateEffectifDeca(document: IRawBalDeca, count: { created: number; updated: number }) {
const newDocuments: WithoutId<IEffectifDECA>[] = await transformDocument(document);
const organismesId = new Set<string>();
for (const newDocument of newDocuments) {
const effectifFound = await checkIfEffectifExists(newDocument, effectifsDECADb());
async function upsertEffectifDeca(
effectif: WithoutId<IEffectifDECA>,
count: { created: number; updated: number },
retry: boolean = true
) {
const effectifFound = await checkIfEffectifExists<IEffectifDECA>(effectif, effectifsDECADb());

if (!effectifFound) {
await effectifsDECADb().insertOne(newDocument as IEffectifDECA);
if (!effectifFound) {
try {
await effectifsDECADb().insertOne({ ...effectif, _id: new ObjectId() });
count.created++;
} else {
await effectifsDECADb().updateOne({ _id: effectifFound._id }, { $set: newDocument });
count.updated++;
} catch (err) {
// Le code d'erreur 11000 correspond à une duplication d'index unique
// Ce cas arrive lors du traitement concurrentiel du meme effectif dans la queue
if (retry && err instanceof MongoError && err.code === 11000) {
return upsertEffectifDeca(effectif, count, false);
}
}
} else {
await effectifsDECADb().updateOne({ _id: effectifFound._id }, { $set: effectif });
count.updated++;
}
}

async function updateEffectifDeca(document: IRawBalDeca, count: { created: number; updated: number }) {
const newDocuments: WithoutId<IEffectifDECA>[] = await transformDocument(document);
const organismesId = new Set<string>();
for (const newDocument of newDocuments) {
await upsertEffectifDeca(newDocument, count);
organismesId.add(newDocument.organisme_id.toString());
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/jobs/ingestion/process-ingestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ const createOrUpdateEffectif = async (
},
};
const itemProcessingInfos: ItemProcessingInfos = {};
let effectifDb = await checkIfEffectifExists(effectifWithComputedFields, effectifsDb());
let effectifDb = await checkIfEffectifExists<IEffectif>(effectifWithComputedFields, effectifsDb());
itemProcessingInfos.effectif_new = !effectifDb;

try {
Expand Down
Loading

0 comments on commit 2aad81e

Please sign in to comment.