Skip to content

Commit

Permalink
#102 | Override all ETL Job endpoints to support 2 different jobGroup…
Browse files Browse the repository at this point in the history
…s for each organisation: Sync and MediaAnalysis
  • Loading branch information
himeshr committed Jul 16, 2024
1 parent 6b5b12c commit 774a615
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 61 deletions.
22 changes: 14 additions & 8 deletions src/main/java/org/avniproject/etl/config/ScheduledJobConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.avniproject.etl.config;

import org.avniproject.etl.contract.backgroundJob.JobEntityType;
import org.avniproject.etl.contract.backgroundJob.JobGroup;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
Expand All @@ -12,24 +13,29 @@

@Component
public class ScheduledJobConfig {
public static final String SYNC_JOB_GROUP = "SyncJobs";
public static final String SYNC_TRIGGER_GROUP = "SyncTriggers";
public static final String JOB_CREATED_AT = "CreatedAt";
public static final String ENTITY_TYPE = "EntityType";

@Value("${avni.scheduledJob.repeatIntervalInMinutes}")
@Value("${avni.scheduledJob.sync.repeatIntervalInMinutes}")
private int repeatIntervalInMinutes;

public TriggerKey getTriggerKey(String organisationUUID) {
return new TriggerKey(organisationUUID, SYNC_TRIGGER_GROUP);
@Value("${avni.scheduledJob.mediaAnalysis.repeatIntervalInMinutes}")
private int mediaAnalysisRepeatIntervalInMinutes;

public TriggerKey getTriggerKey(String organisationUUID, JobGroup jobGroup) {
return new TriggerKey(organisationUUID, jobGroup.getTriggerName());
}

public int getRepeatIntervalInMinutes() {
public int getSyncRepeatIntervalInMinutes() {
return repeatIntervalInMinutes;
}

public JobKey getJobKey(String organisationUUID) {
return new JobKey(organisationUUID, SYNC_JOB_GROUP);
public int getMediaAnalysisRepeatIntervalInMinutes() {
return mediaAnalysisRepeatIntervalInMinutes;
}

public JobKey getJobKey(String organisationUUID, JobGroup jobGroup) {
return new JobKey(organisationUUID, jobGroup.getGroupName());
}

public String getEntityId(JobDetail jobDetail) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/avniproject/etl/contract/JobScheduleRequest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.avniproject.etl.contract;

import org.avniproject.etl.contract.backgroundJob.JobEntityType;
import org.avniproject.etl.contract.backgroundJob.JobGroup;

public class JobScheduleRequest {
private String entityUUID;
private JobEntityType jobEntityType;
private JobGroup jobGroup = JobGroup.Sync;

public String getEntityUUID() {
return entityUUID;
Expand All @@ -21,4 +23,12 @@ public JobEntityType getJobEntityType() {
public void setJobEntityType(JobEntityType jobEntityType) {
this.jobEntityType = jobEntityType;
}

public JobGroup getJobGroup() {
return jobGroup;
}

public void setJobGroup(JobGroup jobGroup) {
this.jobGroup = jobGroup;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.avniproject.etl.contract.backgroundJob;

public enum JobGroup {
Sync("SyncJobs", "SyncTriggers"), MediaAnalysis("MediaAnalysisJobs", "MediaAnalysisTriggers");

String groupName, triggerName;

JobGroup(String groupName, String triggerName) {
this.groupName = groupName;
this.triggerName = triggerName;
}

public String getGroupName() {
return groupName;
}

public String getTriggerName() {
return triggerName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@
import org.apache.log4j.Logger;
import org.avniproject.etl.config.ScheduledJobConfig;
import org.avniproject.etl.contract.JobScheduleRequest;
import org.avniproject.etl.contract.backgroundJob.EtlJobHistoryItem;
import org.avniproject.etl.contract.backgroundJob.EtlJobStatus;
import org.avniproject.etl.contract.backgroundJob.EtlJobSummary;
import org.avniproject.etl.contract.backgroundJob.JobEntityType;
import org.avniproject.etl.contract.backgroundJob.*;
import org.avniproject.etl.domain.OrganisationIdentity;
import org.avniproject.etl.repository.OrganisationRepository;
import org.avniproject.etl.scheduler.EtlJob;
import org.avniproject.etl.scheduler.MediaAnalysisJob;
import org.avniproject.etl.service.backgroundJob.ScheduledJobService;
import org.avniproject.etl.util.DateTimeUtil;
import org.quartz.JobDataMap;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.*;
import org.quartz.impl.JobDetailImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
Expand All @@ -28,7 +22,6 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.avniproject.etl.config.ScheduledJobConfig.SYNC_JOB_GROUP;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

Expand All @@ -50,23 +43,22 @@ public EtlJobController(Scheduler scheduler, ScheduledJobConfig scheduledJobConf

@PreAuthorize("hasAnyAuthority('admin')")
@GetMapping("/job/{entityUUID}")
public ResponseEntity getJob(@PathVariable String entityUUID) throws SchedulerException {
EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(entityUUID);
if (latestJobRun == null)
return ResponseEntity.notFound().build();
public ResponseEntity getJob(@PathVariable String entityUUID, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) throws SchedulerException {
EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(entityUUID, jobGroup != null ? jobGroup : JobGroup.Sync);
if (latestJobRun == null) return ResponseEntity.notFound().build();
return new ResponseEntity(latestJobRun, HttpStatus.OK);
}

@PreAuthorize("hasAnyAuthority('admin')")
@PostMapping("/job/status")
public List<EtlJobStatus> getStatuses(@RequestBody List<String> organisationUUIDs) {
return scheduledJobService.getJobs(organisationUUIDs);
public List<EtlJobStatus> getStatuses(@RequestBody List<String> organisationUUIDs, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) {
return scheduledJobService.getJobs(organisationUUIDs, jobGroup != null ? jobGroup : JobGroup.Sync);
}

@PreAuthorize("hasAnyAuthority('admin')")
@GetMapping("/job/history/{entityUUID}")
public List<EtlJobHistoryItem> getJobHistory(@PathVariable String entityUUID) {
return scheduledJobService.getJobHistory(entityUUID);
public List<EtlJobHistoryItem> getJobHistory(@PathVariable String entityUUID, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup){
return scheduledJobService.getJobHistory(entityUUID, jobGroup != null ? jobGroup : JobGroup.Sync);
}

@PreAuthorize("hasAnyAuthority('admin')")
Expand All @@ -82,39 +74,31 @@ public ResponseEntity createJob(@RequestBody JobScheduleRequest jobScheduleReque
if (organisationIdentity == null && organisationIdentitiesInGroup.size() == 0)
return ResponseEntity.badRequest().body(String.format("No such organisation or group exists: %s", jobScheduleRequest.getEntityUUID()));

EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID());
if (latestJobRun != null)
return ResponseEntity.badRequest().body("Job already present");
EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup());
if (latestJobRun != null) return ResponseEntity.badRequest().body("Job already present");

JobDetailImpl jobDetail = getJobDetail(jobScheduleRequest, organisationIdentity, organisationIdentitiesInGroup);
scheduler.addJob(jobDetail, false);
Trigger trigger = getTrigger(jobScheduleRequest, jobDetail);
scheduler.scheduleJob(trigger);
logger.info(String.format("Job Scheduled for %s:%s", jobScheduleRequest.getJobEntityType(), jobScheduleRequest.getEntityUUID()));
logger.info(String.format("%s type job Scheduled for %s:%s", jobScheduleRequest.getJobGroup(), jobScheduleRequest.getJobEntityType(), jobScheduleRequest.getEntityUUID()));
return ResponseEntity.ok().body("Job Scheduled!");
}

private Trigger getTrigger(JobScheduleRequest jobScheduleRequest, JobDetailImpl jobDetail) {
SimpleScheduleBuilder scheduleBuilder = simpleSchedule()
.withIntervalInMinutes(scheduledJobConfig.getRepeatIntervalInMinutes()).repeatForever();

Trigger trigger = newTrigger()
.withIdentity(scheduledJobConfig.getTriggerKey(jobScheduleRequest.getEntityUUID()))
.forJob(jobDetail)
.withSchedule(scheduleBuilder)
.startAt(DateTimeUtil.nowPlusSeconds(5))
.build();
SimpleScheduleBuilder scheduleBuilder = simpleSchedule().withIntervalInMinutes(jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) ? scheduledJobConfig.getSyncRepeatIntervalInMinutes() : scheduledJobConfig.getMediaAnalysisRepeatIntervalInMinutes()).repeatForever();

Trigger trigger = newTrigger().withIdentity(scheduledJobConfig.getTriggerKey(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup())).forJob(jobDetail).withSchedule(scheduleBuilder).startAt(DateTimeUtil.nowPlusSeconds(5)).build();
return trigger;
}

private JobDetailImpl getJobDetail(JobScheduleRequest jobScheduleRequest, OrganisationIdentity organisationIdentity, List<OrganisationIdentity> organisationIdentitiesInGroup) {
JobDetailImpl jobDetail = new JobDetailImpl();
jobDetail.setJobClass(EtlJob.class);
jobDetail.setJobClass(jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) ? EtlJob.class : MediaAnalysisJob.class);
jobDetail.setDurability(true);
jobDetail.setKey(scheduledJobConfig.getJobKey(jobScheduleRequest.getEntityUUID()));
jobDetail.setDescription(organisationIdentity == null ?
organisationIdentitiesInGroup.stream().map(OrganisationIdentity::toString).collect(Collectors.joining(";")) : organisationIdentity.toString());
jobDetail.setGroup(SYNC_JOB_GROUP);
jobDetail.setKey(scheduledJobConfig.getJobKey(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup()));
jobDetail.setDescription(organisationIdentity == null ? organisationIdentitiesInGroup.stream().map(OrganisationIdentity::toString).collect(Collectors.joining(";")) : organisationIdentity.toString());
jobDetail.setGroup(jobScheduleRequest.getJobGroup().getGroupName());
jobDetail.setName(jobScheduleRequest.getEntityUUID());
JobDataMap jobDataMap = scheduledJobConfig.createJobData(jobScheduleRequest.getJobEntityType());
jobDetail.setJobDataMap(jobDataMap);
Expand All @@ -123,8 +107,16 @@ private JobDetailImpl getJobDetail(JobScheduleRequest jobScheduleRequest, Organi

@PreAuthorize("hasAnyAuthority('admin')")
@DeleteMapping(value = "/job/{id}")
public String deleteJob(@PathVariable String id) throws SchedulerException {
boolean jobDeleted = scheduler.deleteJob(scheduledJobConfig.getJobKey(id));
return jobDeleted ? "Job Deleted" : "Job Not Deleted";
public String deleteJob(@PathVariable String id, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) throws SchedulerException {
boolean syncJobDeleted = scheduler.deleteJob(scheduledJobConfig.getJobKey(id, jobGroup != null ? jobGroup : JobGroup.Sync));
String responseMsg = String.format("Sync Job Deleted: %s; ",syncJobDeleted);
if (jobGroup != null && jobGroup == JobGroup.Sync) {
EtlJobSummary mediaJobRun = scheduledJobService.getLatestJobRun(id, JobGroup.MediaAnalysis);
if (mediaJobRun != null) {
boolean mediaAnalysisJobDeleted = scheduler.deleteJob(scheduledJobConfig.getJobKey(id, JobGroup.MediaAnalysis));
responseMsg.concat(String.format("MediaAnalysis Job Deleted: %s;", mediaAnalysisJobDeleted));
}
}
return responseMsg;
}
}
30 changes: 30 additions & 0 deletions src/main/java/org/avniproject/etl/scheduler/MediaAnalysisJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.avniproject.etl.scheduler;

import org.avniproject.etl.config.ScheduledJobConfig;
import org.avniproject.etl.contract.backgroundJob.JobEntityType;
import org.avniproject.etl.service.MediaAnalysisService;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MediaAnalysisJob implements Job {
private final MediaAnalysisService mediaAnalysisService;
private final ScheduledJobConfig scheduledJobConfig;

@Autowired
public MediaAnalysisJob(MediaAnalysisService mediaAnalysisService, ScheduledJobConfig scheduledJobConfig) {
this.mediaAnalysisService = mediaAnalysisService;
this.scheduledJobConfig = scheduledJobConfig;
}

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDetail jobDetail = context.getJobDetail();
JobDataMap jobDataMap = jobDetail.getJobDataMap();
String entityId = scheduledJobConfig.getEntityId(jobDetail);
if (jobDataMap.get(ScheduledJobConfig.ENTITY_TYPE).equals(JobEntityType.Organisation))
mediaAnalysisService.runFor(entityId);
else mediaAnalysisService.runForOrganisationGroup(entityId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.avniproject.etl.service;

import org.apache.log4j.Logger;
import org.avniproject.etl.config.EtlServiceConfig;
import org.avniproject.etl.domain.OrgIdentityContextHolder;
import org.avniproject.etl.domain.Organisation;
import org.avniproject.etl.domain.OrganisationIdentity;
import org.avniproject.etl.repository.OrganisationRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class MediaAnalysisService {
private final OrganisationRepository organisationRepository;
private final OrganisationFactory organisationFactory;
private final SchemaMigrationService schemaMigrationService;
private final SyncService syncService;
private final EtlServiceConfig etlServiceConfig;
private static final Logger log = Logger.getLogger(MediaAnalysisService.class);

@Autowired
public MediaAnalysisService(OrganisationRepository organisationRepository, OrganisationFactory organisationFactory, SchemaMigrationService schemaMigrationService, SyncService syncService, EtlServiceConfig etlServiceConfig) {
this.organisationRepository = organisationRepository;
this.organisationFactory = organisationFactory;
this.schemaMigrationService = schemaMigrationService;
this.syncService = syncService;
this.etlServiceConfig = etlServiceConfig;
}

public void runFor(String organisationUUID) {
OrganisationIdentity organisationIdentity = organisationRepository.getOrganisation(organisationUUID);
this.runFor(organisationIdentity);
}

public void runForOrganisationGroup(String organisationGroupUUID) {
List<OrganisationIdentity> organisationIdentities = organisationRepository.getOrganisationGroup(organisationGroupUUID);
this.runFor(organisationIdentities);
}

public void runFor(List<OrganisationIdentity> organisationIdentities) {
organisationIdentities.forEach(this::runFor);
}

public void runFor(OrganisationIdentity organisationIdentity) {
log.info(String.format("Running Media Analysis for %s", organisationIdentity.toString()));
OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig);
Organisation organisation = organisationFactory.create(organisationIdentity);
log.info(String.format("Old organisation schema summary %s", organisation.getSchemaMetadata().getCountsByType()));
Organisation newOrganisation = schemaMigrationService.migrate(organisation);
log.info(String.format("New organisation after migration, schema summary %s", newOrganisation.getSchemaMetadata().getCountsByType()));
// TODO
log.info(String.format("Completed Media Analysis for schema %s with dbUser %s and schemaUser %s", organisationIdentity.getSchemaName(), organisationIdentity.getDbUser(), organisationIdentity.getSchemaUser()));
OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig);
}
}
Loading

0 comments on commit 774a615

Please sign in to comment.