Skip to content

Commit

Permalink
feat: mise en place DECA Raw de BAL (#3817)
Browse files Browse the repository at this point in the history
Co-authored-by: Paul G. <[email protected]>
Co-authored-by: Paul Gaucher <[email protected]>
  • Loading branch information
3 people authored Sep 19, 2024
1 parent a5e79a0 commit 296b1cc
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 36 deletions.
11 changes: 5 additions & 6 deletions server/src/common/actions/engine/engine.actions.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { isEqual } from "date-fns";
import { cloneDeep, get } from "lodash-es";
import { Collection } from "mongodb";
import { DEPARTEMENTS_BY_CODE, ACADEMIES_BY_CODE, REGIONS_BY_CODE } from "shared";
import { IEffectif } from "shared/models/data/effectifs.model";
import { IEffectifDECA } from "shared/models/data/effectifsDECA.model";
import { IEffectifQueue } from "shared/models/data/effectifsQueue.model";
import { PartialDeep } from "type-fest";

import { getCodePostalInfo } from "@/common/apis/apiTablesCorrespondances";
import logger from "@/common/logger";
import { effectifsDb } from "@/common/model/collections";
import { stripEmptyFields } from "@/common/utils/miscUtils";

/**
Expand Down Expand Up @@ -103,14 +104,12 @@ export const completeEffectifAddress = async <T extends { apprenant: Partial<IEf
* formation.cfd : Code formation diplôme de la formation suivie par le jeune
* annee_scolaire : Année scolaire dans laquelle se trouve le jeune pour cette formation dans cet établissement
*/
export const checkIfEffectifExists = async (
effectif: IEffectif,
queryKeys = ["id_erp_apprenant", "organisme_id", "formation.cfd", "annee_scolaire"]
) => {
export const checkIfEffectifExists = async (effectif: IEffectif | IEffectifDECA, db: Collection<any>) => {
const queryKeys = ["id_erp_apprenant", "organisme_id", "formation.cfd", "annee_scolaire"];
// Recherche de l'effectif via sa clé d'unicité
const query = queryKeys.reduce((acc, item) => ({ ...acc, [item]: get(effectif, item) }), {});

return await effectifsDb().findOne(query);
return await db.findOne(query);
};

/**
Expand Down
82 changes: 53 additions & 29 deletions server/src/jobs/hydrate/deca/hydrate-deca-raw.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,44 @@
import { normalize } from "path";

import { captureException } from "@sentry/node";
import { ObjectId, WithoutId } from "mongodb";
import { MongoClient, ObjectId } from "mongodb";
import { SOURCE_APPRENANT } from "shared/constants";
import { IEffectif, IOrganisme } from "shared/models";
import { IDecaRaw } from "shared/models/data/decaRaw.model";
import { IAirbyteRawBalDeca } from "shared/models/data/airbyteRawBalDeca.model";
import { zApprenant } from "shared/models/data/effectifs/apprenant.part";
import { zContrat } from "shared/models/data/effectifs/contrat.part";
import { IEffectifDECA } from "shared/models/data/effectifsDECA.model";
import { zodOpenApi } from "shared/models/zodOpenApi";
import { cyrb53Hash, getYearFromDate } from "shared/utils";

import { addComputedFields } from "@/common/actions/effectifs.actions";
import { checkIfEffectifExists } from "@/common/actions/engine/engine.actions";
import { getOrganismeByUAIAndSIRET } from "@/common/actions/organismes/organismes.actions";
import parentLogger from "@/common/logger";
import { decaRawDb, effectifsDECADb } from "@/common/model/collections";
import { effectifsDECADb } from "@/common/model/collections";
import { __dirname } from "@/common/utils/esmUtils";

const logger = parentLogger.child({ module: "job:hydrate:contrats-deca-raw" });

const client = new MongoClient(process.env.MNA_TDB_MONGODB_URI ?? "");

export async function hydrateDecaRaw() {
let count = 0;

try {
await effectifsDECADb().drop();
await client.connect();

const query = {
"_airbyte_data.dispositif": "APPR",
"_airbyte_data.organisme_formation.uai_cfa": { $exists: true },
"_airbyte_data.organisme_formation.siret": { $exists: true },
"_airbyte_data.formation.date_debut_formation": { $exists: true },
"_airbyte_data.formation.date_fin_formation": { $exists: true },
};

const cursor = client.db("airbyte").collection<IAirbyteRawBalDeca>("airbyte_raw_bal_deca").find(query);

const cursor = decaRawDb().find({
dispositif: "APPR",
"organisme_formation.uai_cfa": { $exists: true },
"organisme_formation.siret": { $exists: true },
"formation.date_debut_formation": { $exists: true },
"formation.date_fin_formation": { $exists: true },
});
// await effectifsDECADb().drop();

for await (const document of cursor) {
try {
Expand All @@ -53,14 +60,20 @@ export async function hydrateDecaRaw() {
}
}

async function updateEffectifDeca(document: IDecaRaw) {
const newDocument = await transformDocument(document);
async function updateEffectifDeca(document: IAirbyteRawBalDeca) {
const newDocument: IEffectifDECA = await transformDocument(document);

return await effectifsDECADb().insertOne(newDocument as IEffectifDECA);
}
const effectifFound = await checkIfEffectifExists(newDocument, effectifsDECADb());

async function transformDocument(document: IDecaRaw): Promise<WithoutId<IEffectifDECA>> {
if (effectifFound) {
return await effectifsDECADb().insertOne(newDocument);
} else {
return await effectifsDECADb().updateOne({ _id: effectifFound._id }, newDocument);
}
}
async function transformDocument(document: IAirbyteRawBalDeca): Promise<IEffectifDECA> {
const {
_id,
alternant,
formation,
employeur,
Expand All @@ -69,7 +82,8 @@ async function transformDocument(document: IDecaRaw): Promise<WithoutId<IEffecti
date_fin_contrat,
date_effet_rupture,
type_contrat,
} = document;
} = document._airbyte_data;

const {
nom,
prenom,
Expand All @@ -82,12 +96,22 @@ async function transformDocument(document: IDecaRaw): Promise<WithoutId<IEffecti
adresse: adresseAlternant,
derniere_classe,
} = alternant;

const { date_debut_formation, date_fin_formation, code_diplome, rncp, intitule_ou_qualification } = formation;
const { siret, denomination, naf, adresse, nombre_de_salaries } = employeur;
const { uai_cfa, siret: orgSiret } = organisme_formation;

const startYear = getYearFromDate(date_debut_formation);
const endYear = getYearFromDate(date_fin_formation);
const dateDebutContrat = date_debut_contrat ? new Date(date_debut_contrat) : null;
const dateFinContrat = date_fin_contrat ? new Date(date_fin_contrat) : null;
const dateEffetRupture = date_effet_rupture ? new Date(date_effet_rupture) : null;

const dateDebutFormation = date_debut_formation ? new Date(date_debut_formation) : null;
const dateFinFormation = date_fin_formation ? new Date(date_fin_formation) : null;

const dateNaissance = date_naissance ? new Date(date_naissance) : null;

const startYear = getYearFromDate(dateDebutFormation);
const endYear = getYearFromDate(dateFinFormation);

const organisme: IOrganisme = await getOrganismeByUAIAndSIRET(uai_cfa, orgSiret);

Expand All @@ -99,7 +123,7 @@ async function transformDocument(document: IDecaRaw): Promise<WithoutId<IEffecti
throw new Error("L'année de début et l'année de fin doivent être définies");
}

if (!date_debut_contrat || !date_fin_contrat) {
if (!dateDebutContrat || !dateFinContrat) {
throw new Error("Les dates de début et de fin de contrat sont requises");
}

Expand All @@ -109,12 +133,12 @@ async function transformDocument(document: IDecaRaw): Promise<WithoutId<IEffecti
apprenant: {
nom,
prenom,
date_de_naissance: date_naissance,
date_de_naissance: dateNaissance,
nationalite: nationalite as zodOpenApi.TypeOf<typeof zApprenant>["nationalite"],
historique_statut: [],
has_nir: false,
rqth: handicap,
sexe: sexe === 1 ? "M" : sexe === 0 ? "F" : null,
sexe: sexe === "H" ? "M" : sexe === "F" ? "F" : null,
telephone,
courriel,
adresse: {
Expand All @@ -131,9 +155,9 @@ async function transformDocument(document: IDecaRaw): Promise<WithoutId<IEffecti
type_employeur: parseInt(type_contrat, 10) as zodOpenApi.TypeOf<typeof zContrat>["type_employeur"],
naf,
adresse: { code_postal: adresse.code_postal },
date_debut: date_debut_contrat,
date_fin: date_fin_contrat,
date_rupture: date_effet_rupture,
date_debut: dateDebutContrat,
date_fin: dateFinContrat,
date_rupture: dateEffetRupture,
nombre_de_salaries,
},
],
Expand All @@ -143,17 +167,17 @@ async function transformDocument(document: IDecaRaw): Promise<WithoutId<IEffecti
periode: [startYear, endYear],
libelle_court: intitule_ou_qualification,
libelle_long: intitule_ou_qualification,
date_inscription: date_debut_formation,
date_entree: date_debut_formation,
date_fin: date_fin_formation,
date_inscription: dateDebutFormation,
date_entree: dateDebutFormation,
date_fin: dateFinFormation,
},
organisme_id: organisme._id,
organisme_responsable_id: organisme._id,
organisme_formateur_id: organisme._id,
validation_errors: [],
created_at: new Date(),
updated_at: new Date(),
id_erp_apprenant: cyrb53Hash(normalize(prenom || "").trim() + normalize(nom || "").trim() + (date_naissance || "")),
id_erp_apprenant: cyrb53Hash(normalize(prenom || "").trim() + normalize(nom || "").trim() + (dateNaissance || "")),
source: SOURCE_APPRENANT.DECA,
annee_scolaire: startYear <= 2023 && endYear >= 2024 ? "2023-2024" : `${startYear}-${endYear}`,
};
Expand Down
2 changes: 1 addition & 1 deletion server/src/jobs/ingestion/process-ingestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ const createOrUpdateEffectif = async (
},
};
const itemProcessingInfos: ItemProcessingInfos = {};
let effectifDb = await checkIfEffectifExists(effectifWithComputedFields);
let effectifDb = await checkIfEffectifExists(effectifWithComputedFields, effectifsDb());
itemProcessingInfos.effectif_new = !effectifDb;

try {
Expand Down
100 changes: 100 additions & 0 deletions shared/models/data/airbyteRawBalDeca.model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import type { CreateIndexesOptions, IndexSpecification } from "mongodb";
import { z } from "zod";
import { zObjectId } from "zod-mongodb-schema";

// Collection name
const collectionName = "airbyte_raw_bal_deca";

// Indexes
const indexes: [IndexSpecification, CreateIndexesOptions][] = [
[{ statut: 1 }, {}],
[{ "employeur.siret": 1 }, { unique: false }],
[{ "formation.code_diplome": 1 }, { unique: false }],
[{ created_at: 1 }, { unique: false }],
];

// Address schema
const zAdresse = z.object({
voie: z.string().optional(),
code_postal: z.string(),
numero: z.string().optional(),
});

// Alternant schema
const zAlternant = z.object({
nom: z.string(),
prenom: z.string(),
sexe: z.string(),
date_naissance: z.string().describe("Date de naissance de l'alternant").nullish(),
departement_naissance: z.string(),
nationalite: z.number(),
handicap: z.boolean(),
courriel: z.string(),
telephone: z.string(),
adresse: zAdresse,
derniere_classe: z.number(),
});

// Employeur schema
const zEmployeur = z.object({
siret: z.string(),
denomination: z.string(),
adresse: zAdresse.pick({ code_postal: true }),
naf: z.string(),
code_idcc: z.string(),
nombre_de_salaries: z.number(),
courriel: z.string().optional(),
telephone: z.string().optional(),
});

// Formation schema
const zFormation = z.object({
code_diplome: z.string(),
rncp: z.string(),
intitule_ou_qualification: z.string(),
type_diplome: z.string(),
date_debut_formation: z.string().describe("Date de début de la formation").nullish(),
date_fin_formation: z.string().describe("Date de fin de la formation").nullish(),
});

// Organisme formation schema
const zOrganismeFormation = z.object({
siret: z.string(),
uai_cfa: z.string(),
});

// Main schema for the airbyte data
const zAirbyteData = z.object({
_id: z.string(),
no_contrat: z.string(),
type_contrat: z.string(),
alternant: zAlternant,
date_debut_contrat: z.string().describe("Date de début du contrat").nullish(),
date_fin_contrat: z.string().describe("Date de fin du contrat").nullish(),
date_effet_rupture: z.string().describe("Date d'effet de la rupture du contrat").nullish(),
dispositif: z.string(),
employeur: zEmployeur,
organisme_formation: zOrganismeFormation,
formation: zFormation,
created_at: z.string().describe("Date de création de l'enregistrement dans la base de données").nullish(),
updated_at: z.string().describe("Date de dernière mise à jour de l'enregistrement dans la base de données").nullish(),
_ab_cdc_updated_at: z.string().optional(),
_ab_cdc_cursor: z
.object({
$numberLong: z.string(),
})
.optional(),
_ab_cdc_deleted_at: z.null().optional(),
});

const zAirbyteRawBalDeca = z.object({
_id: zObjectId.describe("Identifiant MongoDB de l'enregistrement"),
_airbyte_data: zAirbyteData,
_airbyte_data_hash: z.string(),
_airbyte_emitted_at: z.string().describe("Date à laquelle les données ont été émises").optional(),
});

// Define the type
export type IAirbyteRawBalDeca = z.infer<typeof zAirbyteRawBalDeca>;

export default { zod: zAirbyteRawBalDeca, indexes, collectionName };

0 comments on commit 296b1cc

Please sign in to comment.