Skip to content

Commit

Permalink
Latest runs for jobs (#2901)
Browse files Browse the repository at this point in the history
* Latest runs coming down.

* Adding query param for job listing.

* Editing tests.

* Bringing in jobs without a run state for tests.

* Spotless

* Moving to list for lastRunStates.

Signed-off-by: phixMe <[email protected]>

---------

Signed-off-by: phixMe <[email protected]>
  • Loading branch information
phixMe authored Oct 8, 2024
1 parent 1d90f18 commit 65d57f7
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 33 deletions.
13 changes: 12 additions & 1 deletion api/src/main/java/marquez/api/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.Min;
Expand Down Expand Up @@ -42,6 +44,7 @@
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.RunState;
import marquez.common.models.TagName;
import marquez.common.models.Version;
import marquez.db.JobFacetsDao;
Expand Down Expand Up @@ -164,11 +167,19 @@ public Response listJobVersions(
@Produces(APPLICATION_JSON)
public Response list(
@PathParam("namespace") NamespaceName namespaceName,
@QueryParam("lastRunStates") List<RunState> lastRunStates,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit,
@QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) {
throwIfNotExists(namespaceName);

final List<Job> jobs = jobService.findAllWithRun(namespaceName.getValue(), limit, offset);
// default to all run states if not specified
if (lastRunStates.isEmpty()) {
lastRunStates = new ArrayList<>();
Collections.addAll(lastRunStates, RunState.values());
}

final List<Job> jobs =
jobService.findAllWithRun(namespaceName.getValue(), lastRunStates, limit, offset);
final int totalCount = jobService.countFor(namespaceName.getValue());
return Response.ok(new ResultsPage<>("jobs", jobs, totalCount)).build();
}
Expand Down
57 changes: 36 additions & 21 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,28 @@
import marquez.common.models.JobName;
import marquez.common.models.JobType;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunState;
import marquez.db.JobVersionDao.IoType;
import marquez.db.JobVersionDao.JobDataset;
import marquez.db.JobVersionDao.JobDatasetMapper;
import marquez.db.mappers.JobMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.service.models.Job;
import marquez.service.models.JobMeta;
import marquez.service.models.Run;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.postgresql.util.PGobject;

@RegisterRowMapper(JobRowMapper.class)
@RegisterRowMapper(JobMapper.class)
@RegisterRowMapper(JobDatasetMapper.class)
@RegisterRowMapper(RunMapper.class)
public interface JobDao extends BaseDao {

@SqlQuery(
Expand Down Expand Up @@ -138,14 +142,10 @@ default Optional<Job> findWithDatasetsAndRun(String namespaceName, String jobNam
Optional<Job> job = findJobByName(namespaceName, jobName);
job.ifPresent(
j -> {
Optional<Run> run = createRunDao().findByLatestJob(namespaceName, jobName);
run.ifPresentOrElse(
r -> this.setJobData(r, j),
() ->
this.setJobData(
createJobVersionDao()
.findCurrentInputOutputDatasetsFor(namespaceName, jobName),
j));
List<Run> runs = createRunDao().findByLatestJob(namespaceName, jobName, 1, 0);
this.setJobData(runs, j);
this.setJobDataset(
createJobVersionDao().findCurrentInputOutputDatasetsFor(namespaceName, jobName), j);
});
return job;
}
Expand Down Expand Up @@ -244,10 +244,18 @@ job_tags as (
ON f.run_uuid = jv.latest_run_uuid
LEFT OUTER JOIN job_tags jt
ON j.uuid = jt.uuid
LEFT JOIN runs r
ON r.uuid = jv.latest_run_uuid
WHERE
(r.current_run_state IN (<lastRunStates>) OR r.uuid IS NULL)
ORDER BY
j.name
""")
List<Job> findAll(String namespaceName, int limit, int offset);
List<Job> findAll(
String namespaceName,
@BindList("lastRunStates") List<RunState> lastRunStates,
int limit,
int offset);

@SqlQuery("SELECT count(*) FROM jobs_view AS j WHERE symlink_target_uuid IS NULL")
int count();
Expand All @@ -271,18 +279,19 @@ job_tags as (
+ "AND symlink_target_uuid IS NULL")
int countFor(String namespaceName);

default List<Job> findAllWithRun(String namespaceName, int limit, int offset) {
default List<Job> findAllWithRun(
String namespaceName, List<RunState> lastRunStates, int limit, int offset) {
RunDao runDao = createRunDao();
return findAll(namespaceName, limit, offset).stream()
return findAll(namespaceName, lastRunStates, limit, offset).stream()
.peek(
j ->
runDao
.findByLatestJob(namespaceName, j.getName().getValue())
.ifPresent(run -> this.setJobData(run, j)))
.collect(Collectors.toList());
j -> {
List<Run> runs = runDao.findByLatestJob(namespaceName, j.getName().getValue(), 10, 0);
this.setJobData(runs, j);
})
.toList();
}

default void setJobData(List<JobDataset> datasets, Job j) {
default void setJobDataset(List<JobDataset> datasets, Job j) {
Optional.of(
datasets.stream()
.filter(d -> d.ioType().equals(IoType.INPUT))
Expand All @@ -304,19 +313,25 @@ default void setJobData(List<JobDataset> datasets, Job j) {
.ifPresent(s -> j.setOutputs(s));
}

default void setJobData(Run run, Job j) {
j.setLatestRun(run);
default void setJobData(List<Run> runs, Job j) {
if (runs.isEmpty()) {
return;
}

Run latestRun = runs.get(0);
j.setLatestRun(latestRun);
j.setLatestRuns(runs);
DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
j.setInputs(
datasetVersionDao.findInputDatasetVersionsFor(run.getId().getValue()).stream()
datasetVersionDao.findInputDatasetVersionsFor(latestRun.getId().getValue()).stream()
.map(
ds ->
new DatasetId(
NamespaceName.of(ds.getNamespaceName()),
DatasetName.of(ds.getDatasetName())))
.collect(Collectors.toSet()));
j.setOutputs(
datasetVersionDao.findOutputDatasetVersionsFor(run.getId().getValue()).stream()
datasetVersionDao.findOutputDatasetVersionsFor(latestRun.getId().getValue()).stream()
.map(
ds ->
new DatasetId(
Expand Down
6 changes: 3 additions & 3 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -510,15 +510,15 @@ default RunRow upsertRunMeta(
@SqlQuery(
BASE_FIND_RUN_SQL
+ """
WHERE r.uuid=(
WHERE r.uuid IN (
SELECT r.uuid FROM runs_view r
INNER JOIN jobs_view j ON j.namespace_name=r.namespace_name AND j.name=r.job_name
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR j.name=ANY(j.aliases))
ORDER BY transitioned_at DESC
LIMIT 1
LIMIT :limit OFFSET :offset
)
""")
Optional<Run> findByLatestJob(String namespace, String jobName);
List<Run> findByLatestJob(String namespace, String jobName, int limit, int offset);

@Builder
record RunUpsert(
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/mappers/JobMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public Job map(@NonNull ResultSet results, @NonNull StatementContext context)
// Latest Run is resolved in the JobDao. This can be brought in via a join and
// and a jsonb but custom deserializers will need to be introduced
null,
null,
facetsOrNull,
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
getLabels(facetsOrNull),
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/marquez/service/models/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.collect.ImmutableSet;
import java.net.URL;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -43,6 +44,7 @@ public final class Job {
@Nullable private final URL location;
@Nullable private final String description;
@Nullable @Setter private Run latestRun;
@Nullable @Setter private List<Run> latestRuns;
@Getter private final ImmutableMap<String, Object> facets;
@Nullable private UUID currentVersion;
@Getter @Nullable private ImmutableList<String> labels;
Expand All @@ -62,6 +64,7 @@ public Job(
@Nullable final URL location,
@Nullable final String description,
@Nullable final Run latestRun,
@Nullable final List<Run> latestRuns,
@Nullable final ImmutableMap<String, Object> facets,
@Nullable UUID currentVersion,
@Nullable ImmutableList<String> labels,
Expand All @@ -80,6 +83,7 @@ public Job(
this.location = location;
this.description = description;
this.latestRun = latestRun;
this.latestRuns = latestRuns;
this.facets = (facets == null) ? ImmutableMap.of() : facets;
this.currentVersion = currentVersion;
this.labels = (labels == null) ? ImmutableList.of() : labels;
Expand All @@ -98,6 +102,10 @@ public Optional<Run> getLatestRun() {
return Optional.ofNullable(latestRun);
}

public Optional<List<Run>> getLatestRuns() {
return Optional.ofNullable(latestRuns);
}

public Optional<UUID> getCurrentVersion() {
return Optional.ofNullable(currentVersion);
}
Expand Down
8 changes: 7 additions & 1 deletion api/src/test/java/marquez/db/JobDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import marquez.common.models.JobType;
import marquez.common.models.RunState;
import marquez.db.models.DbModelGenerator;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
Expand Down Expand Up @@ -106,7 +109,10 @@ public void testFindAll() {
JobRow anotherJobSameNamespace =
createJobWithoutSymlinkTarget(jdbi, namespace, "anotherJob", "a random other job");

List<Job> jobs = jobDao.findAll(namespace.getName(), 10, 0);
List<RunState> runStates = new ArrayList<>();
Collections.addAll(runStates, RunState.values());

List<Job> jobs = jobDao.findAll(namespace.getName(), runStates, 10, 0);

// the symlinked job isn't present in the response - only the symlink target and the job with
// no symlink
Expand Down
12 changes: 6 additions & 6 deletions api/src/test/java/marquez/db/RunDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,9 @@ public void testFindByLatestJob() {
TreeSet<RunRow> sortedRuns =
new TreeSet<>(Comparator.comparing(RunRow::getUpdatedAt).reversed());
sortedRuns.addAll(runs);
Optional<Run> byLatestJob = runDao.findByLatestJob(jobRow.getNamespaceName(), jobRow.getName());
Run byLatestJob =
runDao.findByLatestJob(jobRow.getNamespaceName(), jobRow.getName(), 1, 0).get(0);
assertThat(byLatestJob)
.isPresent()
.get()
.hasFieldOrPropertyWithValue("id", new RunId(sortedRuns.first().getUuid()));

JobRow newTargetJob =
Expand All @@ -183,10 +182,11 @@ public void testFindByLatestJob() {
jobMeta.getDescription().orElse(null));

// get the latest run for the *newTargetJob*. It should be the same as the old job's latest run
byLatestJob = runDao.findByLatestJob(newTargetJob.getNamespaceName(), newTargetJob.getName());
byLatestJob =
runDao
.findByLatestJob(newTargetJob.getNamespaceName(), newTargetJob.getName(), 1, 0)
.get(0);
assertThat(byLatestJob)
.isPresent()
.get()
.hasFieldOrPropertyWithValue("id", new RunId(sortedRuns.first().getUuid()));
}

Expand Down
7 changes: 6 additions & 1 deletion api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -28,6 +29,7 @@
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.OutputDatasetVersion;
import marquez.common.models.RunState;
import marquez.db.DatasetDao;
import marquez.db.JobDao;
import marquez.db.LineageDao;
Expand Down Expand Up @@ -205,7 +207,10 @@ && jobNameEquals(n, "downstreamJob0<-outputData<-readJob0<-commonDataset"))
new NamespaceName(NAMESPACE),
new DatasetName("outputData<-readJob0<-commonDataset")));

List<Job> jobs = jobDao.findAllWithRun(NAMESPACE, 1000, 0);
List<RunState> runStates = new ArrayList<>();
Collections.addAll(runStates, RunState.values());

List<Job> jobs = jobDao.findAllWithRun(NAMESPACE, runStates, 1000, 0);
jobs =
jobs.stream()
.filter(j -> j.getName().getValue().contains("newDownstreamJob"))
Expand Down
5 changes: 5 additions & 0 deletions spec/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,10 @@ components:
type: string
latestRun:
$ref: '#/components/schemas/Run'
latestRuns:
type: array
items:
$ref: '#/components/schemas/Run'
facets:
$ref: '#/components/schemas/JobFacets'
currentVersion:
Expand All @@ -1617,6 +1621,7 @@ components:
context: {'SQL': "SELECT * FROM mytable;"}
description: My first job!
latestRun: null
latestRuns: []
facets: {}
currentVersion: "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"

Expand Down

0 comments on commit 65d57f7

Please sign in to comment.