Skip to content

Commit

Permalink
Merge branch '9.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
himeshr committed Aug 8, 2024
2 parents 590169a + 4a08b57 commit f1263a2
Show file tree
Hide file tree
Showing 40 changed files with 1,046 additions and 107 deletions.
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ debug_server: build_server
debug_server_with_dump_data_org: build_server
OPENCHS_DATABASE_NAME=avni_org OPENCHS_CLIENT_ID=dummy OPENCHS_KEYCLOAK_CLIENT_SECRET=dummy AVNI_IDP_TYPE=none java -Xmx2048m -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005 -jar ./build/libs/etl-1.0.0-SNAPSHOT.jar


start_server_staging: build_server
-mkdir -p /tmp/avni-etl-service && sudo ln -s /tmp/avni-etl-service /var/log/avni-etl-service

AVNI_IDP_TYPE=cognito \
OPENCHS_CLIENT_ID=$(OPENCHS_STAGING_APP_CLIENT_ID) \
OPENCHS_USER_POOL=$(OPENCHS_STAGING_USER_POOL_ID) \
OPENCHS_IAM_USER=$(OPENCHS_STAGING_IAM_USER) \
OPENCHS_IAM_USER_ACCESS_KEY=$(OPENCHS_STAGING_IAM_USER_ACCESS_KEY) \
OPENCHS_IAM_USER_SECRET_ACCESS_KEY=$(OPENCHS_STAGING_IAM_USER_SECRET_ACCESS_KEY) \
OPENCHS_BUCKET_NAME=staging-user-media \
OPENCHS_DATABASE_URL=jdbc:postgresql://localhost:6015/openchs \
java -jar ./build/libs/etl-1.0.0-SNAPSHOT.jar

boot_run:
./gradlew bootRun

Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-quartz'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-security'
implementation 'org.springframework.boot:spring-boot-starter-test'
runtimeOnly 'org.postgresql:postgresql'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
implementation "log4j:log4j:1.2.17"
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/org/avniproject/etl/config/AmazonClientService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.AmazonS3URI;
import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.function.Predicate;
import java.util.regex.Pattern;

@Service
@ConditionalOnProperty(value = "aws.s3.enable", havingValue = "true", matchIfMissing = true)
public class AmazonClientService {

public static final int MAX_KEYS = 1000;
@Value("${avni.bucket.name}")
String bucketName;

Expand Down Expand Up @@ -65,4 +73,21 @@ private Date getExpireDate(long expireDuration) {
expiration.setTime(expiration.getTime() + expireDuration);
return expiration;
}

public ArrayList<String> listObjectsInBucket(String s3PathPrefix) {
ArrayList<String> listOfMediaUrls = new ArrayList<>();
ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucketName).withPrefix(s3PathPrefix).withMaxKeys(MAX_KEYS);
ListObjectsV2Result result;
do {
result = s3Client.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
listOfMediaUrls.add(objectSummary.getKey());
}
// If there are more than maxKeys keys in the bucket, get a continuation token
// and list the next objects.
String token = result.getNextContinuationToken();
req.setContinuationToken(token);
} while (result.isTruncated());
return listOfMediaUrls;
}
}
22 changes: 14 additions & 8 deletions src/main/java/org/avniproject/etl/config/ScheduledJobConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.avniproject.etl.config;

import org.avniproject.etl.contract.backgroundJob.JobEntityType;
import org.avniproject.etl.contract.backgroundJob.JobGroup;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
Expand All @@ -12,24 +13,29 @@

@Component
public class ScheduledJobConfig {
public static final String SYNC_JOB_GROUP = "SyncJobs";
public static final String SYNC_TRIGGER_GROUP = "SyncTriggers";
public static final String JOB_CREATED_AT = "CreatedAt";
public static final String ENTITY_TYPE = "EntityType";

@Value("${avni.scheduledJob.repeatIntervalInMinutes}")
@Value("${avni.scheduledJob.sync.repeatIntervalInMinutes}")
private int repeatIntervalInMinutes;

public TriggerKey getTriggerKey(String organisationUUID) {
return new TriggerKey(organisationUUID, SYNC_TRIGGER_GROUP);
@Value("${avni.scheduledJob.mediaAnalysis.repeatIntervalInMinutes}")
private int mediaAnalysisRepeatIntervalInMinutes;

public TriggerKey getTriggerKey(String organisationUUID, JobGroup jobGroup) {
return new TriggerKey(organisationUUID, jobGroup.getTriggerName());
}

public int getRepeatIntervalInMinutes() {
public int getSyncRepeatIntervalInMinutes() {
return repeatIntervalInMinutes;
}

public JobKey getJobKey(String organisationUUID) {
return new JobKey(organisationUUID, SYNC_JOB_GROUP);
public int getMediaAnalysisRepeatIntervalInMinutes() {
return mediaAnalysisRepeatIntervalInMinutes;
}

public JobKey getJobKey(String organisationUUID, JobGroup jobGroup) {
return new JobKey(organisationUUID, jobGroup.getGroupName());
}

public String getEntityId(JobDetail jobDetail) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/avniproject/etl/contract/JobScheduleRequest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.avniproject.etl.contract;

import org.avniproject.etl.contract.backgroundJob.JobEntityType;
import org.avniproject.etl.contract.backgroundJob.JobGroup;

public class JobScheduleRequest {
private String entityUUID;
private JobEntityType jobEntityType;
private JobGroup jobGroup = JobGroup.Sync;

public String getEntityUUID() {
return entityUUID;
Expand All @@ -21,4 +23,12 @@ public JobEntityType getJobEntityType() {
public void setJobEntityType(JobEntityType jobEntityType) {
this.jobEntityType = jobEntityType;
}

public JobGroup getJobGroup() {
return jobGroup;
}

public void setJobGroup(JobGroup jobGroup) {
this.jobGroup = jobGroup;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.avniproject.etl.contract.backgroundJob;

public enum JobGroup {
Sync("SyncJobs", "SyncTriggers"), MediaAnalysis("MediaAnalysisJobs", "MediaAnalysisTriggers");

String groupName, triggerName;

JobGroup(String groupName, String triggerName) {
this.groupName = groupName;
this.triggerName = triggerName;
}

public String getGroupName() {
return groupName;
}

public String getTriggerName() {
return triggerName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,26 @@
import org.apache.log4j.Logger;
import org.avniproject.etl.config.ScheduledJobConfig;
import org.avniproject.etl.contract.JobScheduleRequest;
import org.avniproject.etl.contract.backgroundJob.EtlJobHistoryItem;
import org.avniproject.etl.contract.backgroundJob.EtlJobStatus;
import org.avniproject.etl.contract.backgroundJob.EtlJobSummary;
import org.avniproject.etl.contract.backgroundJob.JobEntityType;
import org.avniproject.etl.contract.backgroundJob.*;
import org.avniproject.etl.domain.OrganisationIdentity;
import org.avniproject.etl.repository.OrganisationRepository;
import org.avniproject.etl.scheduler.EtlJob;
import org.avniproject.etl.scheduler.MediaAnalysisJob;
import org.avniproject.etl.service.backgroundJob.ScheduledJobService;
import org.avniproject.etl.util.DateTimeUtil;
import org.quartz.JobDataMap;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.*;
import org.quartz.impl.JobDetailImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static org.avniproject.etl.config.ScheduledJobConfig.SYNC_JOB_GROUP;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

Expand All @@ -50,23 +44,22 @@ public EtlJobController(Scheduler scheduler, ScheduledJobConfig scheduledJobConf

@PreAuthorize("hasAnyAuthority('admin')")
@GetMapping("/job/{entityUUID}")
public ResponseEntity getJob(@PathVariable String entityUUID) throws SchedulerException {
EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(entityUUID);
if (latestJobRun == null)
return ResponseEntity.notFound().build();
public ResponseEntity getJob(@PathVariable String entityUUID, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) throws SchedulerException {
EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(entityUUID, jobGroup != null ? jobGroup : JobGroup.Sync);
if (latestJobRun == null) return ResponseEntity.notFound().build();
return new ResponseEntity(latestJobRun, HttpStatus.OK);
}

@PreAuthorize("hasAnyAuthority('admin')")
@PostMapping("/job/status")
public List<EtlJobStatus> getStatuses(@RequestBody List<String> organisationUUIDs) {
return scheduledJobService.getJobs(organisationUUIDs);
public List<EtlJobStatus> getStatuses(@RequestBody List<String> organisationUUIDs, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) {
return scheduledJobService.getJobs(organisationUUIDs, jobGroup != null ? jobGroup : JobGroup.Sync);
}

@PreAuthorize("hasAnyAuthority('admin')")
@GetMapping("/job/history/{entityUUID}")
public List<EtlJobHistoryItem> getJobHistory(@PathVariable String entityUUID) {
return scheduledJobService.getJobHistory(entityUUID);
public List<EtlJobHistoryItem> getJobHistory(@PathVariable String entityUUID, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup){
return scheduledJobService.getJobHistory(entityUUID, jobGroup != null ? jobGroup : JobGroup.Sync);
}

@PreAuthorize("hasAnyAuthority('admin')")
Expand All @@ -79,52 +72,70 @@ public ResponseEntity createJob(@RequestBody JobScheduleRequest jobScheduleReque
else
organisationIdentitiesInGroup = organisationRepository.getOrganisationGroup(jobScheduleRequest.getEntityUUID());

if (organisationIdentity == null && organisationIdentitiesInGroup.size() == 0)
return ResponseEntity.badRequest().body(String.format("No such organisation or group exists: %s", jobScheduleRequest.getEntityUUID()));

EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID());
if (latestJobRun != null)
return ResponseEntity.badRequest().body("Job already present");
ResponseEntity<String> jobScheduleValidationResult = validateRequest(jobScheduleRequest, organisationIdentity, organisationIdentitiesInGroup);
if (jobScheduleValidationResult != null) return jobScheduleValidationResult;

JobDetailImpl jobDetail = getJobDetail(jobScheduleRequest, organisationIdentity, organisationIdentitiesInGroup);
scheduler.addJob(jobDetail, false);
Trigger trigger = getTrigger(jobScheduleRequest, jobDetail);
scheduler.scheduleJob(trigger);
logger.info(String.format("Job Scheduled for %s:%s", jobScheduleRequest.getJobEntityType(), jobScheduleRequest.getEntityUUID()));
logger.info(String.format("%s type job Scheduled for %s:%s", jobScheduleRequest.getJobGroup(), jobScheduleRequest.getJobEntityType(), jobScheduleRequest.getEntityUUID()));
return ResponseEntity.ok().body("Job Scheduled!");
}

private ResponseEntity<String> validateRequest(JobScheduleRequest jobScheduleRequest, OrganisationIdentity organisationIdentity, List<OrganisationIdentity> organisationIdentitiesInGroup) throws SchedulerException {
if (organisationIdentity == null && organisationIdentitiesInGroup.size() == 0) {
return ResponseEntity.badRequest().body(String.format("No such organisation or group exists: %s", jobScheduleRequest.getEntityUUID()));
}
EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup());
if (latestJobRun != null) return ResponseEntity.badRequest().body("Job already present");
if (!jobScheduleRequest.getJobGroup().equals(JobGroup.Sync)) {
EtlJobSummary correspondingSyncJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID(), JobGroup.Sync);
if (correspondingSyncJobRun == null)
return ResponseEntity.badRequest().body("Sync Job has not been triggered for this Org / OrgGroup");
}
return null;
}

private Trigger getTrigger(JobScheduleRequest jobScheduleRequest, JobDetailImpl jobDetail) {
SimpleScheduleBuilder scheduleBuilder = simpleSchedule()
.withIntervalInMinutes(scheduledJobConfig.getRepeatIntervalInMinutes()).repeatForever();

Trigger trigger = newTrigger()
.withIdentity(scheduledJobConfig.getTriggerKey(jobScheduleRequest.getEntityUUID()))
.forJob(jobDetail)
.withSchedule(scheduleBuilder)
.startAt(DateTimeUtil.nowPlusSeconds(5))
.build();
SimpleScheduleBuilder scheduleBuilder = simpleSchedule().withIntervalInMinutes(jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) ? scheduledJobConfig.getSyncRepeatIntervalInMinutes() : scheduledJobConfig.getMediaAnalysisRepeatIntervalInMinutes()).repeatForever();

Trigger trigger = newTrigger().withIdentity(scheduledJobConfig.getTriggerKey(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup())).forJob(jobDetail).withSchedule(scheduleBuilder).startAt(DateTimeUtil.nowPlusSeconds(5)).build();
return trigger;
}

private JobDetailImpl getJobDetail(JobScheduleRequest jobScheduleRequest, OrganisationIdentity organisationIdentity, List<OrganisationIdentity> organisationIdentitiesInGroup) {
JobDetailImpl jobDetail = new JobDetailImpl();
jobDetail.setJobClass(EtlJob.class);
jobDetail.setJobClass(jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) ? EtlJob.class : MediaAnalysisJob.class);
jobDetail.setDurability(true);
jobDetail.setKey(scheduledJobConfig.getJobKey(jobScheduleRequest.getEntityUUID()));
jobDetail.setDescription(organisationIdentity == null ?
organisationIdentitiesInGroup.stream().map(OrganisationIdentity::getSchemaName).collect(Collectors.joining(";")) : organisationIdentity.getSchemaName());
jobDetail.setGroup(SYNC_JOB_GROUP);
jobDetail.setKey(scheduledJobConfig.getJobKey(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup()));
String truncatedDescription = getTruncatedDescription(organisationIdentity, organisationIdentitiesInGroup);
jobDetail.setDescription(truncatedDescription);
jobDetail.setGroup(jobScheduleRequest.getJobGroup().getGroupName());
jobDetail.setName(jobScheduleRequest.getEntityUUID());
JobDataMap jobDataMap = scheduledJobConfig.createJobData(jobScheduleRequest.getJobEntityType());
jobDetail.setJobDataMap(jobDataMap);
return jobDetail;
}

private String getTruncatedDescription(OrganisationIdentity organisationIdentity, List<OrganisationIdentity> organisationIdentitiesInGroup) {
String orgGroupSchemaNames = organisationIdentitiesInGroup.stream().map(OrganisationIdentity::getSchemaName).collect(Collectors.joining(";"));
String description = organisationIdentity == null ? "OrgGroup Schema names: " + orgGroupSchemaNames : organisationIdentity.toString();
return StringUtils.truncate(description, 240);
}

@PreAuthorize("hasAnyAuthority('admin')")
@DeleteMapping(value = "/job/{id}")
public String deleteJob(@PathVariable String id) throws SchedulerException {
boolean jobDeleted = scheduler.deleteJob(scheduledJobConfig.getJobKey(id));
return jobDeleted ? "Job Deleted" : "Job Not Deleted";
public String deleteJob(@PathVariable String id, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) throws SchedulerException {
boolean syncJobDeleted = scheduler.deleteJob(scheduledJobConfig.getJobKey(id, jobGroup != null ? jobGroup : JobGroup.Sync));
String responseMsg = String.format("Sync Job Deleted: %s; ",syncJobDeleted);
if (jobGroup != null && jobGroup == JobGroup.Sync) {
EtlJobSummary mediaJobRun = scheduledJobService.getLatestJobRun(id, JobGroup.MediaAnalysis);
if (mediaJobRun != null) {
boolean mediaAnalysisJobDeleted = scheduler.deleteJob(scheduledJobConfig.getJobKey(id, JobGroup.MediaAnalysis));
responseMsg.concat(String.format("MediaAnalysis Job Deleted: %s;", mediaAnalysisJobDeleted));
}
}
return responseMsg;
}
}
17 changes: 12 additions & 5 deletions src/main/java/org/avniproject/etl/domain/OrganisationIdentity.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,33 @@ public class OrganisationIdentity {
private final String dbUser;
private final String schemaName;
private final String schemaUser;
private final String mediaDirectory;
private List<String> orgGroupOrgDbUsers;
private Date startTime;

private OrganisationIdentity(String dbUser, String schemaName, String schemaUser) {
private OrganisationIdentity(String dbUser, String schemaName, String schemaUser, String mediaDirectory) {
this.dbUser = dbUser;
this.schemaName = schemaName;
this.schemaUser = schemaUser;
this.mediaDirectory = mediaDirectory;
}

/**
* @param dbUser The database user who has access the source data of the organisation and is also a schema user
* @param schemaName The destination schema name to be created/updated
*/
public static OrganisationIdentity createForOrganisation(String dbUser, String schemaName) {
return new OrganisationIdentity(dbUser, schemaName, dbUser);
public static OrganisationIdentity createForOrganisation(String dbUser, String schemaName, String mediaDirectory) {
return new OrganisationIdentity(dbUser, schemaName, dbUser, mediaDirectory);
}


/**
* @param dbUser The database user who has access the source data of the organisation
* @param schemaName The destination schema name to be created/updated
* @param schemaUser The destination user who will have access to the schema
*/
public static OrganisationIdentity createForOrganisationGroup(String dbUser, String schemaName, String schemaUser) {
return new OrganisationIdentity(dbUser, schemaName, schemaUser);
return new OrganisationIdentity(dbUser, schemaName, schemaUser, null);
}

public String getSchemaName() {
Expand All @@ -41,7 +44,7 @@ public String getSchemaName() {

@Override
public String toString() {
return String.format("Schema: %s, DB User: %s, Schema User: %s", schemaName, dbUser, schemaUser);
return String.format("Schema: %s, DB User: %s, Schema User: %s, MediaDirectory: %s", schemaName, dbUser, schemaUser, mediaDirectory);
}

public String getDbUser() {
Expand Down Expand Up @@ -71,4 +74,8 @@ public Date getStartTime() {
public void setStartTime(Date startTime) {
this.startTime = startTime;
}

public String getMediaDirectory() {
return mediaDirectory;
}
}
Loading

0 comments on commit f1263a2

Please sign in to comment.