Skip to content

Commit

Permalink
Merge pull request #121 from caraml-dev/fix-list-job-api
Browse files Browse the repository at this point in the history
bugfix: fixt restart streaming job and list scheduled jobs
  • Loading branch information
khorshuheng authored May 17, 2024
2 parents b7163a3 + 655dad7 commit 5ece7db
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ message Job {

// Spark job error message, if available
string error_message = 10;

// Project
string project = 11;
}

// Ingest data from offline store into online store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,13 @@ private Job sparkApplicationToJob(SparkApplication app) {
default -> JobStatus.JOB_STATUS_PENDING;
};
}
String project = labels.getOrDefault(PROJECT_LABEL, "");
String tableName = labels.getOrDefault(FEATURE_TABLE_LABEL, "");

Job.Builder builder =
Job.newBuilder()
.setId(app.getMetadata().getName())
.setProject(project)
.setStartTime(startTime)
.setStatus(jobStatus);
switch (JobType.valueOf(labels.get(JOB_TYPE_LABEL))) {
Expand Down Expand Up @@ -162,7 +164,7 @@ private ScheduledJob scheduledSparkApplicationToScheduledJob(ScheduledSparkAppli
public Job createOrUpdateStreamingIngestionJob(String project, String featureTableName) {
FeatureTableSpec spec =
tableRepository
.findFeatureTableByNameAndProject_NameAndIsDeletedFalse(project, featureTableName)
.findFeatureTableByNameAndProject_NameAndIsDeletedFalse(featureTableName, project)
.map(ft -> ft.toProto().getSpec())
.orElseThrow(
() -> {
Expand Down
Loading

0 comments on commit 5ece7db

Please sign in to comment.