From 9e2e9b4346e75cdd141c0f4fef6697aa8481ef6c Mon Sep 17 00:00:00 2001 From: Peter Hicks Date: Fri, 25 Oct 2024 16:16:43 -0700 Subject: [PATCH] Lineage run attachment issue. (#2953) Signed-off-by: phixMe Co-authored-by: Willy Lulciuc --- api/src/main/java/marquez/MarquezContext.java | 2 +- .../marquez/db/mappers/JobDataMapper.java | 1 + .../java/marquez/service/LineageService.java | 21 +++++++------------ .../java/marquez/service/models/JobData.java | 1 + .../marquez/service/LineageServiceTest.java | 4 +++- 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index 26421092e8..dd789e82ed 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -153,7 +153,7 @@ private MarquezContext( this.tagService = new TagService(baseDao); this.tagService.init(tags); this.openLineageService = new OpenLineageService(baseDao, runService); - this.lineageService = new LineageService(lineageDao, jobDao); + this.lineageService = new LineageService(lineageDao, jobDao, runDao); this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao); this.searchService = new SearchService(searchConfig); this.statsService = new StatsService(statsDao); diff --git a/api/src/main/java/marquez/db/mappers/JobDataMapper.java b/api/src/main/java/marquez/db/mappers/JobDataMapper.java index ca68f5deaa..2e93f48292 100644 --- a/api/src/main/java/marquez/db/mappers/JobDataMapper.java +++ b/api/src/main/java/marquez/db/mappers/JobDataMapper.java @@ -41,6 +41,7 @@ public JobData map(@NonNull ResultSet results, @NonNull StatementContext context stringOrThrow(results, Columns.SIMPLE_NAME), stringOrNull(results, Columns.PARENT_JOB_NAME), uuidOrNull(results, Columns.PARENT_JOB_UUID), + uuidOrNull(results, Columns.CURRENT_RUN_UUID), timestampOrThrow(results, Columns.CREATED_AT), timestampOrThrow(results, Columns.UPDATED_AT), NamespaceName.of(stringOrThrow(results, Columns.NAMESPACE_NAME)), diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 3d6c7a3ea9..f9b19cc654 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -36,6 +36,7 @@ import marquez.db.LineageDao.DatasetSummary; import marquez.db.LineageDao.JobSummary; import marquez.db.LineageDao.RunSummary; +import marquez.db.RunDao; import marquez.db.models.JobRow; import marquez.service.DelegatingDaos.DelegatingLineageDao; import marquez.service.LineageService.UpstreamRunLineage; @@ -58,9 +59,12 @@ public record UpstreamRun(JobSummary job, RunSummary run, List i private final JobDao jobDao; - public LineageService(LineageDao delegate, JobDao jobDao) { + private final RunDao runDao; + + public LineageService(LineageDao delegate, JobDao jobDao, RunDao runDao) { super(delegate); this.jobDao = jobDao; + this.runDao = runDao; } // TODO make input parameters easily extendable if adding more options like 'withJobFacets' @@ -89,20 +93,11 @@ public Lineage lineage(NodeId nodeId, int depth) { return toLineageWithOrphanDataset(nodeId.asDatasetId()); } - List runs = - getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet())); - for (JobData j : jobData) { - if (j.getLatestRun().isEmpty()) { - for (Run run : runs) { - if (j.getName().getValue().equalsIgnoreCase(run.getJobName()) - && j.getNamespace().getValue().equalsIgnoreCase(run.getNamespaceName())) { - j.setLatestRun(run); - break; - } - } - } + Optional run = runDao.findRunByUuid(j.getCurrentRunUuid()); + run.ifPresent(j::setLatestRun); } + Set datasetIds = jobData.stream() .flatMap(jd -> Stream.concat(jd.getInputUuids().stream(), jd.getOutputUuids().stream())) diff --git a/api/src/main/java/marquez/service/models/JobData.java b/api/src/main/java/marquez/service/models/JobData.java index 8495545347..f7f625a9b0 100644 --- a/api/src/main/java/marquez/service/models/JobData.java +++ b/api/src/main/java/marquez/service/models/JobData.java @@ -35,6 +35,7 @@ public class JobData implements NodeData { @NonNull String simpleName; @Nullable String parentJobName; @Nullable UUID parentJobUuid; + @Getter @Nullable UUID currentRunUuid; @NonNull Instant createdAt; @NonNull Instant updatedAt; @NonNull NamespaceName namespace; diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index c439a4b171..74ec7ccd44 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -36,6 +36,7 @@ import marquez.db.LineageTestUtils.DatasetConsumerJob; import marquez.db.LineageTestUtils.JobLineage; import marquez.db.OpenLineageDao; +import marquez.db.RunDao; import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.LineageService.UpstreamRunLineage; @@ -86,7 +87,8 @@ public class LineageServiceTest { public static void setUpOnce(Jdbi jdbi) { LineageServiceTest.jdbi = jdbi; lineageDao = jdbi.onDemand(LineageDao.class); - lineageService = new LineageService(lineageDao, jdbi.onDemand(JobDao.class)); + lineageService = + new LineageService(lineageDao, jdbi.onDemand(JobDao.class), jdbi.onDemand(RunDao.class)); openLineageDao = jdbi.onDemand(OpenLineageDao.class); datasetDao = jdbi.onDemand(DatasetDao.class); jobDao = jdbi.onDemand(JobDao.class);