Skip to content

Commit

Permalink
Lineage run attachment issue. (#2953)
Browse files Browse the repository at this point in the history
Signed-off-by: phixMe <[email protected]>
Co-authored-by: Willy Lulciuc <[email protected]>
  • Loading branch information
phixMe and wslulciuc authored Oct 25, 2024
1 parent 46bbfa7 commit 9e2e9b4
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 15 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private MarquezContext(
this.tagService = new TagService(baseDao);
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao, jobDao);
this.lineageService = new LineageService(lineageDao, jobDao, runDao);
this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao);
this.searchService = new SearchService(searchConfig);
this.statsService = new StatsService(statsDao);
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/mappers/JobDataMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public JobData map(@NonNull ResultSet results, @NonNull StatementContext context
stringOrThrow(results, Columns.SIMPLE_NAME),
stringOrNull(results, Columns.PARENT_JOB_NAME),
uuidOrNull(results, Columns.PARENT_JOB_UUID),
uuidOrNull(results, Columns.CURRENT_RUN_UUID),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT),
NamespaceName.of(stringOrThrow(results, Columns.NAMESPACE_NAME)),
Expand Down
21 changes: 8 additions & 13 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import marquez.db.LineageDao.DatasetSummary;
import marquez.db.LineageDao.JobSummary;
import marquez.db.LineageDao.RunSummary;
import marquez.db.RunDao;
import marquez.db.models.JobRow;
import marquez.service.DelegatingDaos.DelegatingLineageDao;
import marquez.service.LineageService.UpstreamRunLineage;
Expand All @@ -58,9 +59,12 @@ public record UpstreamRun(JobSummary job, RunSummary run, List<DatasetSummary> i

private final JobDao jobDao;

public LineageService(LineageDao delegate, JobDao jobDao) {
private final RunDao runDao;

public LineageService(LineageDao delegate, JobDao jobDao, RunDao runDao) {
super(delegate);
this.jobDao = jobDao;
this.runDao = runDao;
}

// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
Expand Down Expand Up @@ -89,20 +93,11 @@ public Lineage lineage(NodeId nodeId, int depth) {
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}

List<Run> runs =
getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));

for (JobData j : jobData) {
if (j.getLatestRun().isEmpty()) {
for (Run run : runs) {
if (j.getName().getValue().equalsIgnoreCase(run.getJobName())
&& j.getNamespace().getValue().equalsIgnoreCase(run.getNamespaceName())) {
j.setLatestRun(run);
break;
}
}
}
Optional<Run> run = runDao.findRunByUuid(j.getCurrentRunUuid());
run.ifPresent(j::setLatestRun);
}

Set<UUID> datasetIds =
jobData.stream()
.flatMap(jd -> Stream.concat(jd.getInputUuids().stream(), jd.getOutputUuids().stream()))
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/service/models/JobData.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class JobData implements NodeData {
@NonNull String simpleName;
@Nullable String parentJobName;
@Nullable UUID parentJobUuid;
@Getter @Nullable UUID currentRunUuid;
@NonNull Instant createdAt;
@NonNull Instant updatedAt;
@NonNull NamespaceName namespace;
Expand Down
4 changes: 3 additions & 1 deletion api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import marquez.db.LineageTestUtils.DatasetConsumerJob;
import marquez.db.LineageTestUtils.JobLineage;
import marquez.db.OpenLineageDao;
import marquez.db.RunDao;
import marquez.db.models.UpdateLineageRow;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.LineageService.UpstreamRunLineage;
Expand Down Expand Up @@ -86,7 +87,8 @@ public class LineageServiceTest {
public static void setUpOnce(Jdbi jdbi) {
LineageServiceTest.jdbi = jdbi;
lineageDao = jdbi.onDemand(LineageDao.class);
lineageService = new LineageService(lineageDao, jdbi.onDemand(JobDao.class));
lineageService =
new LineageService(lineageDao, jdbi.onDemand(JobDao.class), jdbi.onDemand(RunDao.class));
openLineageDao = jdbi.onDemand(OpenLineageDao.class);
datasetDao = jdbi.onDemand(DatasetDao.class);
jobDao = jdbi.onDemand(JobDao.class);
Expand Down

0 comments on commit 9e2e9b4

Please sign in to comment.