Skip to content

Commit

Permalink
Merge pull request #275 from bento-platform/fix/workflow-runs-err
Browse files Browse the repository at this point in the history
fix: handle undefined file params in workflow run last content component
  • Loading branch information
davidlougheed authored Aug 8, 2023
2 parents 90031d0 + afc4d7f commit 7514b25
Showing 1 changed file with 35 additions and 29 deletions.
64 changes: 35 additions & 29 deletions src/components/manager/runs/RunLastContent.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,38 +112,44 @@ const processIngestions = (data, currentTables) => {
const currentTableIds = new Set((currentTables || []).map((table) => table.table_id));

const ingestionsByDataType = data.reduce((ingestions, run) => {
if (run.state === "COMPLETE") {
const dataType = run.details.request.tags.workflow_metadata.data_type;
const tableId = run.details.request.tags.table_id;
if (run.state !== "COMPLETE") {
return ingestions;
}

if (tableId === undefined || !currentTableIds.has(tableId)) {
return ingestions;
}
const fileNameKey = getFileInputsFromWorkflowMetadata(run.details.request.tags.workflow_metadata);
const filePaths = fileNameKey.flatMap(key =>
Array.isArray(run.details.request.workflow_params[key])
? run.details.request.workflow_params[key]
: [run.details.request.workflow_params[key]],
);
const fileNames = Array.isArray(filePaths)
? filePaths.map(fileNameFromPath)
: [fileNameFromPath(filePaths)];

const date = Date.parse(run.details.run_log.end_time);

const currentIngestion = { date, dataType, tableId, fileNames };
const dataTypeAndTableId = buildKeyFromRecord(currentIngestion);

if (ingestions[dataTypeAndTableId]) {
const existingDate = ingestions[dataTypeAndTableId].date;
if (date > existingDate) {
ingestions[dataTypeAndTableId].date = date;
}
ingestions[dataTypeAndTableId].fileNames.push(...fileNames);
} else {
ingestions[dataTypeAndTableId] = currentIngestion;
const { workflow_metadata: workflowMetadata, table_id: tableId } = run.details.request.tags;

if (tableId === undefined || !currentTableIds.has(tableId)) {
return ingestions;
}

const fileNames =
getFileInputsFromWorkflowMetadata(run.details.request.tags.workflow_metadata)
.flatMap(key => {
const paramValue = run.details.request.workflow_params[key];
if (!paramValue) {
// Key isn't in workflow params or is null
// - possibly optional field or something else going wrong
return [];
}
return Array.isArray(paramValue) ? paramValue : [paramValue];
})
.map(fileNameFromPath);

const date = Date.parse(run.details.run_log.end_time);

const currentIngestion = { date, dataType: workflowMetadata.data_type, tableId, fileNames };
const dataTypeAndTableId = buildKeyFromRecord(currentIngestion);

if (ingestions[dataTypeAndTableId]) {
const existingDate = ingestions[dataTypeAndTableId].date;
if (date > existingDate) {
ingestions[dataTypeAndTableId].date = date;
}
ingestions[dataTypeAndTableId].fileNames.push(...fileNames);
} else {
ingestions[dataTypeAndTableId] = currentIngestion;
}

return ingestions;
}, {});

Expand Down

0 comments on commit 7514b25

Please sign in to comment.