From 774a615bb20e429231d17df51feadf6ccf065721 Mon Sep 17 00:00:00 2001 From: himeshr Date: Tue, 16 Jul 2024 17:45:34 +0530 Subject: [PATCH 01/14] #102 | Override all ETL Job endpoints to support 2 different jobGroups for each organisation: Sync and MediaAnalysis --- .../etl/config/ScheduledJobConfig.java | 22 +++--- .../etl/contract/JobScheduleRequest.java | 10 +++ .../etl/contract/backgroundJob/JobGroup.java | 20 ++++++ .../backgroundJob/EtlJobController.java | 70 ++++++++----------- .../etl/scheduler/MediaAnalysisJob.java | 30 ++++++++ .../etl/service/MediaAnalysisService.java | 57 +++++++++++++++ .../backgroundJob/ScheduledJobService.java | 32 +++++---- .../resources/main-application.properties | 3 +- 8 files changed, 183 insertions(+), 61 deletions(-) create mode 100644 src/main/java/org/avniproject/etl/contract/backgroundJob/JobGroup.java create mode 100644 src/main/java/org/avniproject/etl/scheduler/MediaAnalysisJob.java create mode 100644 src/main/java/org/avniproject/etl/service/MediaAnalysisService.java diff --git a/src/main/java/org/avniproject/etl/config/ScheduledJobConfig.java b/src/main/java/org/avniproject/etl/config/ScheduledJobConfig.java index e8dfc7b..cd987e9 100644 --- a/src/main/java/org/avniproject/etl/config/ScheduledJobConfig.java +++ b/src/main/java/org/avniproject/etl/config/ScheduledJobConfig.java @@ -1,6 +1,7 @@ package org.avniproject.etl.config; import org.avniproject.etl.contract.backgroundJob.JobEntityType; +import org.avniproject.etl.contract.backgroundJob.JobGroup; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobKey; @@ -12,24 +13,29 @@ @Component public class ScheduledJobConfig { - public static final String SYNC_JOB_GROUP = "SyncJobs"; - public static final String SYNC_TRIGGER_GROUP = "SyncTriggers"; public static final String JOB_CREATED_AT = "CreatedAt"; public static final String ENTITY_TYPE = "EntityType"; - @Value("${avni.scheduledJob.repeatIntervalInMinutes}") + @Value("${avni.scheduledJob.sync.repeatIntervalInMinutes}") private int repeatIntervalInMinutes; - public TriggerKey getTriggerKey(String organisationUUID) { - return new TriggerKey(organisationUUID, SYNC_TRIGGER_GROUP); + @Value("${avni.scheduledJob.mediaAnalysis.repeatIntervalInMinutes}") + private int mediaAnalysisRepeatIntervalInMinutes; + + public TriggerKey getTriggerKey(String organisationUUID, JobGroup jobGroup) { + return new TriggerKey(organisationUUID, jobGroup.getTriggerName()); } - public int getRepeatIntervalInMinutes() { + public int getSyncRepeatIntervalInMinutes() { return repeatIntervalInMinutes; } - public JobKey getJobKey(String organisationUUID) { - return new JobKey(organisationUUID, SYNC_JOB_GROUP); + public int getMediaAnalysisRepeatIntervalInMinutes() { + return mediaAnalysisRepeatIntervalInMinutes; + } + + public JobKey getJobKey(String organisationUUID, JobGroup jobGroup) { + return new JobKey(organisationUUID, jobGroup.getGroupName()); } public String getEntityId(JobDetail jobDetail) { diff --git a/src/main/java/org/avniproject/etl/contract/JobScheduleRequest.java b/src/main/java/org/avniproject/etl/contract/JobScheduleRequest.java index 1e35382..c678fb0 100644 --- a/src/main/java/org/avniproject/etl/contract/JobScheduleRequest.java +++ b/src/main/java/org/avniproject/etl/contract/JobScheduleRequest.java @@ -1,10 +1,12 @@ package org.avniproject.etl.contract; import org.avniproject.etl.contract.backgroundJob.JobEntityType; +import org.avniproject.etl.contract.backgroundJob.JobGroup; public class JobScheduleRequest { private String entityUUID; private JobEntityType jobEntityType; + private JobGroup jobGroup = JobGroup.Sync; public String getEntityUUID() { return entityUUID; @@ -21,4 +23,12 @@ public JobEntityType getJobEntityType() { public void setJobEntityType(JobEntityType jobEntityType) { this.jobEntityType = jobEntityType; } + + public JobGroup getJobGroup() { + return jobGroup; + } + + public void setJobGroup(JobGroup jobGroup) { + this.jobGroup = jobGroup; + } } diff --git a/src/main/java/org/avniproject/etl/contract/backgroundJob/JobGroup.java b/src/main/java/org/avniproject/etl/contract/backgroundJob/JobGroup.java new file mode 100644 index 0000000..f202293 --- /dev/null +++ b/src/main/java/org/avniproject/etl/contract/backgroundJob/JobGroup.java @@ -0,0 +1,20 @@ +package org.avniproject.etl.contract.backgroundJob; + +public enum JobGroup { + Sync("SyncJobs", "SyncTriggers"), MediaAnalysis("MediaAnalysisJobs", "MediaAnalysisTriggers"); + + String groupName, triggerName; + + JobGroup(String groupName, String triggerName) { + this.groupName = groupName; + this.triggerName = triggerName; + } + + public String getGroupName() { + return groupName; + } + + public String getTriggerName() { + return triggerName; + } +} diff --git a/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java b/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java index b860358..284c05e 100644 --- a/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java +++ b/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java @@ -3,20 +3,14 @@ import org.apache.log4j.Logger; import org.avniproject.etl.config.ScheduledJobConfig; import org.avniproject.etl.contract.JobScheduleRequest; -import org.avniproject.etl.contract.backgroundJob.EtlJobHistoryItem; -import org.avniproject.etl.contract.backgroundJob.EtlJobStatus; -import org.avniproject.etl.contract.backgroundJob.EtlJobSummary; -import org.avniproject.etl.contract.backgroundJob.JobEntityType; +import org.avniproject.etl.contract.backgroundJob.*; import org.avniproject.etl.domain.OrganisationIdentity; import org.avniproject.etl.repository.OrganisationRepository; import org.avniproject.etl.scheduler.EtlJob; +import org.avniproject.etl.scheduler.MediaAnalysisJob; import org.avniproject.etl.service.backgroundJob.ScheduledJobService; import org.avniproject.etl.util.DateTimeUtil; -import org.quartz.JobDataMap; -import org.quartz.Scheduler; -import org.quartz.SchedulerException; -import org.quartz.SimpleScheduleBuilder; -import org.quartz.Trigger; +import org.quartz.*; import org.quartz.impl.JobDetailImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; @@ -28,7 +22,6 @@ import java.util.List; import java.util.stream.Collectors; -import static org.avniproject.etl.config.ScheduledJobConfig.SYNC_JOB_GROUP; import static org.quartz.SimpleScheduleBuilder.simpleSchedule; import static org.quartz.TriggerBuilder.newTrigger; @@ -50,23 +43,22 @@ public EtlJobController(Scheduler scheduler, ScheduledJobConfig scheduledJobConf @PreAuthorize("hasAnyAuthority('admin')") @GetMapping("/job/{entityUUID}") - public ResponseEntity getJob(@PathVariable String entityUUID) throws SchedulerException { - EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(entityUUID); - if (latestJobRun == null) - return ResponseEntity.notFound().build(); + public ResponseEntity getJob(@PathVariable String entityUUID, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) throws SchedulerException { + EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(entityUUID, jobGroup != null ? jobGroup : JobGroup.Sync); + if (latestJobRun == null) return ResponseEntity.notFound().build(); return new ResponseEntity(latestJobRun, HttpStatus.OK); } @PreAuthorize("hasAnyAuthority('admin')") @PostMapping("/job/status") - public List getStatuses(@RequestBody List organisationUUIDs) { - return scheduledJobService.getJobs(organisationUUIDs); + public List getStatuses(@RequestBody List organisationUUIDs, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) { + return scheduledJobService.getJobs(organisationUUIDs, jobGroup != null ? jobGroup : JobGroup.Sync); } @PreAuthorize("hasAnyAuthority('admin')") @GetMapping("/job/history/{entityUUID}") - public List getJobHistory(@PathVariable String entityUUID) { - return scheduledJobService.getJobHistory(entityUUID); + public List getJobHistory(@PathVariable String entityUUID, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup){ + return scheduledJobService.getJobHistory(entityUUID, jobGroup != null ? jobGroup : JobGroup.Sync); } @PreAuthorize("hasAnyAuthority('admin')") @@ -82,39 +74,31 @@ public ResponseEntity createJob(@RequestBody JobScheduleRequest jobScheduleReque if (organisationIdentity == null && organisationIdentitiesInGroup.size() == 0) return ResponseEntity.badRequest().body(String.format("No such organisation or group exists: %s", jobScheduleRequest.getEntityUUID())); - EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID()); - if (latestJobRun != null) - return ResponseEntity.badRequest().body("Job already present"); + EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup()); + if (latestJobRun != null) return ResponseEntity.badRequest().body("Job already present"); JobDetailImpl jobDetail = getJobDetail(jobScheduleRequest, organisationIdentity, organisationIdentitiesInGroup); scheduler.addJob(jobDetail, false); Trigger trigger = getTrigger(jobScheduleRequest, jobDetail); scheduler.scheduleJob(trigger); - logger.info(String.format("Job Scheduled for %s:%s", jobScheduleRequest.getJobEntityType(), jobScheduleRequest.getEntityUUID())); + logger.info(String.format("%s type job Scheduled for %s:%s", jobScheduleRequest.getJobGroup(), jobScheduleRequest.getJobEntityType(), jobScheduleRequest.getEntityUUID())); return ResponseEntity.ok().body("Job Scheduled!"); } private Trigger getTrigger(JobScheduleRequest jobScheduleRequest, JobDetailImpl jobDetail) { - SimpleScheduleBuilder scheduleBuilder = simpleSchedule() - .withIntervalInMinutes(scheduledJobConfig.getRepeatIntervalInMinutes()).repeatForever(); - - Trigger trigger = newTrigger() - .withIdentity(scheduledJobConfig.getTriggerKey(jobScheduleRequest.getEntityUUID())) - .forJob(jobDetail) - .withSchedule(scheduleBuilder) - .startAt(DateTimeUtil.nowPlusSeconds(5)) - .build(); + SimpleScheduleBuilder scheduleBuilder = simpleSchedule().withIntervalInMinutes(jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) ? scheduledJobConfig.getSyncRepeatIntervalInMinutes() : scheduledJobConfig.getMediaAnalysisRepeatIntervalInMinutes()).repeatForever(); + + Trigger trigger = newTrigger().withIdentity(scheduledJobConfig.getTriggerKey(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup())).forJob(jobDetail).withSchedule(scheduleBuilder).startAt(DateTimeUtil.nowPlusSeconds(5)).build(); return trigger; } private JobDetailImpl getJobDetail(JobScheduleRequest jobScheduleRequest, OrganisationIdentity organisationIdentity, List organisationIdentitiesInGroup) { JobDetailImpl jobDetail = new JobDetailImpl(); - jobDetail.setJobClass(EtlJob.class); + jobDetail.setJobClass(jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) ? EtlJob.class : MediaAnalysisJob.class); jobDetail.setDurability(true); - jobDetail.setKey(scheduledJobConfig.getJobKey(jobScheduleRequest.getEntityUUID())); - jobDetail.setDescription(organisationIdentity == null ? - organisationIdentitiesInGroup.stream().map(OrganisationIdentity::toString).collect(Collectors.joining(";")) : organisationIdentity.toString()); - jobDetail.setGroup(SYNC_JOB_GROUP); + jobDetail.setKey(scheduledJobConfig.getJobKey(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup())); + jobDetail.setDescription(organisationIdentity == null ? organisationIdentitiesInGroup.stream().map(OrganisationIdentity::toString).collect(Collectors.joining(";")) : organisationIdentity.toString()); + jobDetail.setGroup(jobScheduleRequest.getJobGroup().getGroupName()); jobDetail.setName(jobScheduleRequest.getEntityUUID()); JobDataMap jobDataMap = scheduledJobConfig.createJobData(jobScheduleRequest.getJobEntityType()); jobDetail.setJobDataMap(jobDataMap); @@ -123,8 +107,16 @@ private JobDetailImpl getJobDetail(JobScheduleRequest jobScheduleRequest, Organi @PreAuthorize("hasAnyAuthority('admin')") @DeleteMapping(value = "/job/{id}") - public String deleteJob(@PathVariable String id) throws SchedulerException { - boolean jobDeleted = scheduler.deleteJob(scheduledJobConfig.getJobKey(id)); - return jobDeleted ? "Job Deleted" : "Job Not Deleted"; + public String deleteJob(@PathVariable String id, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) throws SchedulerException { + boolean syncJobDeleted = scheduler.deleteJob(scheduledJobConfig.getJobKey(id, jobGroup != null ? jobGroup : JobGroup.Sync)); + String responseMsg = String.format("Sync Job Deleted: %s; ",syncJobDeleted); + if (jobGroup != null && jobGroup == JobGroup.Sync) { + EtlJobSummary mediaJobRun = scheduledJobService.getLatestJobRun(id, JobGroup.MediaAnalysis); + if (mediaJobRun != null) { + boolean mediaAnalysisJobDeleted = scheduler.deleteJob(scheduledJobConfig.getJobKey(id, JobGroup.MediaAnalysis)); + responseMsg.concat(String.format("MediaAnalysis Job Deleted: %s;", mediaAnalysisJobDeleted)); + } + } + return responseMsg; } } diff --git a/src/main/java/org/avniproject/etl/scheduler/MediaAnalysisJob.java b/src/main/java/org/avniproject/etl/scheduler/MediaAnalysisJob.java new file mode 100644 index 0000000..f7dcf98 --- /dev/null +++ b/src/main/java/org/avniproject/etl/scheduler/MediaAnalysisJob.java @@ -0,0 +1,30 @@ +package org.avniproject.etl.scheduler; + +import org.avniproject.etl.config.ScheduledJobConfig; +import org.avniproject.etl.contract.backgroundJob.JobEntityType; +import org.avniproject.etl.service.MediaAnalysisService; +import org.quartz.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class MediaAnalysisJob implements Job { + private final MediaAnalysisService mediaAnalysisService; + private final ScheduledJobConfig scheduledJobConfig; + + @Autowired + public MediaAnalysisJob(MediaAnalysisService mediaAnalysisService, ScheduledJobConfig scheduledJobConfig) { + this.mediaAnalysisService = mediaAnalysisService; + this.scheduledJobConfig = scheduledJobConfig; + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + JobDetail jobDetail = context.getJobDetail(); + JobDataMap jobDataMap = jobDetail.getJobDataMap(); + String entityId = scheduledJobConfig.getEntityId(jobDetail); + if (jobDataMap.get(ScheduledJobConfig.ENTITY_TYPE).equals(JobEntityType.Organisation)) + mediaAnalysisService.runFor(entityId); + else mediaAnalysisService.runForOrganisationGroup(entityId); + } +} diff --git a/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java b/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java new file mode 100644 index 0000000..5089bec --- /dev/null +++ b/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java @@ -0,0 +1,57 @@ +package org.avniproject.etl.service; + +import org.apache.log4j.Logger; +import org.avniproject.etl.config.EtlServiceConfig; +import org.avniproject.etl.domain.OrgIdentityContextHolder; +import org.avniproject.etl.domain.Organisation; +import org.avniproject.etl.domain.OrganisationIdentity; +import org.avniproject.etl.repository.OrganisationRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +public class MediaAnalysisService { + private final OrganisationRepository organisationRepository; + private final OrganisationFactory organisationFactory; + private final SchemaMigrationService schemaMigrationService; + private final SyncService syncService; + private final EtlServiceConfig etlServiceConfig; + private static final Logger log = Logger.getLogger(MediaAnalysisService.class); + + @Autowired + public MediaAnalysisService(OrganisationRepository organisationRepository, OrganisationFactory organisationFactory, SchemaMigrationService schemaMigrationService, SyncService syncService, EtlServiceConfig etlServiceConfig) { + this.organisationRepository = organisationRepository; + this.organisationFactory = organisationFactory; + this.schemaMigrationService = schemaMigrationService; + this.syncService = syncService; + this.etlServiceConfig = etlServiceConfig; + } + + public void runFor(String organisationUUID) { + OrganisationIdentity organisationIdentity = organisationRepository.getOrganisation(organisationUUID); + this.runFor(organisationIdentity); + } + + public void runForOrganisationGroup(String organisationGroupUUID) { + List organisationIdentities = organisationRepository.getOrganisationGroup(organisationGroupUUID); + this.runFor(organisationIdentities); + } + + public void runFor(List organisationIdentities) { + organisationIdentities.forEach(this::runFor); + } + + public void runFor(OrganisationIdentity organisationIdentity) { + log.info(String.format("Running Media Analysis for %s", organisationIdentity.toString())); + OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig); + Organisation organisation = organisationFactory.create(organisationIdentity); + log.info(String.format("Old organisation schema summary %s", organisation.getSchemaMetadata().getCountsByType())); + Organisation newOrganisation = schemaMigrationService.migrate(organisation); + log.info(String.format("New organisation after migration, schema summary %s", newOrganisation.getSchemaMetadata().getCountsByType())); +// TODO + log.info(String.format("Completed Media Analysis for schema %s with dbUser %s and schemaUser %s", organisationIdentity.getSchemaName(), organisationIdentity.getDbUser(), organisationIdentity.getSchemaUser())); + OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig); + } +} diff --git a/src/main/java/org/avniproject/etl/service/backgroundJob/ScheduledJobService.java b/src/main/java/org/avniproject/etl/service/backgroundJob/ScheduledJobService.java index df4de77..4a8314a 100644 --- a/src/main/java/org/avniproject/etl/service/backgroundJob/ScheduledJobService.java +++ b/src/main/java/org/avniproject/etl/service/backgroundJob/ScheduledJobService.java @@ -4,6 +4,7 @@ import org.avniproject.etl.contract.backgroundJob.EtlJobHistoryItem; import org.avniproject.etl.contract.backgroundJob.EtlJobStatus; import org.avniproject.etl.contract.backgroundJob.EtlJobSummary; +import org.avniproject.etl.contract.backgroundJob.JobGroup; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerException; @@ -23,13 +24,9 @@ public class ScheduledJobService { private final Scheduler scheduler; private final ScheduledJobConfig scheduledJobConfig; - private static final String HISTORY_QUERY = "select sjr.started_at, sjr.ended_at, sjr.error_message, sjr.success from qrtz_job_details qjd\n" + - " left outer join scheduled_job_run sjr on sjr.job_name = qjd.job_name\n" + - " where sjr.job_name = ?" + - "order by 1 desc\n"; + private static final String HISTORY_QUERY = "select sjr.started_at, sjr.ended_at, sjr.error_message, sjr.success from qrtz_job_details qjd\n" + " left outer join scheduled_job_run sjr on sjr.job_name = qjd.job_name\n" + " where qjd.job_name = ? and qjd.job_group = ?" + "order by 1 desc\n"; - private static final String JOB_LIST_QUERY = "select organisationUUID, job_name from (SELECT unnest(string_to_array(?, ',')) as organisationUUID) foo\n" + - " left outer join qrtz_job_details qjd on organisationUUID = qjd.job_name"; + private static final String JOB_LIST_QUERY = "select organisationUUID, job_name from (SELECT unnest(string_to_array(?, ',')) as organisationUUID) foo\n" + " left outer join qrtz_job_details qjd on organisationUUID = qjd.job_name where qjd.job_group = ?"; @Autowired public ScheduledJobService(JdbcTemplate jdbcTemplate, Scheduler scheduler, ScheduledJobConfig scheduledJobConfig) { @@ -38,24 +35,33 @@ public ScheduledJobService(JdbcTemplate jdbcTemplate, Scheduler scheduler, Sched this.scheduledJobConfig = scheduledJobConfig; } - public List getJobs(List organisationUUIDs) { + public List getJobs(List organisationUUIDs, JobGroup jobGroup) { String organisations = String.join(",", organisationUUIDs); - return jdbcTemplate.query(JOB_LIST_QUERY, ps -> ps.setString(1, organisations), new EtlJobStatusMapper()); + return jdbcTemplate.query(JOB_LIST_QUERY, ps -> { + ps.setString(1, organisations); + ps.setString(2, jobGroup.getGroupName()); + }, new EtlJobStatusMapper()); } - public EtlJobSummary getLatestJobRun(String organisationUUID) throws SchedulerException { + public EtlJobSummary getLatestJobRun(String organisationUUID, JobGroup jobGroup) throws SchedulerException { String query = HISTORY_QUERY + "limit 1"; - List summaries = jdbcTemplate.query(query, ps -> ps.setString(1, organisationUUID), new EtlJobLatestStatusResponseMapper()); + List summaries = jdbcTemplate.query(query, ps -> { + ps.setString(1, organisationUUID); + ps.setString(2, jobGroup.getGroupName()); + }, new EtlJobLatestStatusResponseMapper()); if (summaries.size() == 0) return null; EtlJobSummary etlJobSummary = summaries.get(0); - JobDetail jobDetail = scheduler.getJobDetail(scheduledJobConfig.getJobKey(organisationUUID)); + JobDetail jobDetail = scheduler.getJobDetail(scheduledJobConfig.getJobKey(organisationUUID, jobGroup)); etlJobSummary.setCreatedAt((Date) jobDetail.getJobDataMap().get(ScheduledJobConfig.JOB_CREATED_AT)); return etlJobSummary; } - public List getJobHistory(String organisationUUID) { - return jdbcTemplate.query(HISTORY_QUERY, ps -> ps.setString(1, organisationUUID), new EtlJobHistoryItemMapper()); + public List getJobHistory(String organisationUUID, JobGroup jobGroup) { + return jdbcTemplate.query(HISTORY_QUERY, ps -> { + ps.setString(1, organisationUUID); + ps.setString(2, jobGroup.getGroupName()); + }, new EtlJobHistoryItemMapper()); } static class EtlJobLatestStatusResponseMapper implements RowMapper { diff --git a/src/main/resources/main-application.properties b/src/main/resources/main-application.properties index bfd5f48..f8d726c 100644 --- a/src/main/resources/main-application.properties +++ b/src/main/resources/main-application.properties @@ -16,7 +16,8 @@ spring.quartz.properties.org.quartz.threadPool.threadCount=${ETL_JOB_THREAD_COUN spring.quartz.properties.org.quartz.jobStore.misfireThreshold = ${AVNI_SCHEDULED_JOB_TRIGGER_MISFIRE_THRESHOLD_IN_MILLISECONDS:3600000} # Internal Scheduler config -avni.scheduledJob.repeatIntervalInMinutes=${AVNI_SCHEDULED_JOB_REPEAT_INTERVAL_IN_MINUTES:90} +avni.scheduledJob.sync.repeatIntervalInMinutes=${AVNI_SCHEDULED_JOB_REPEAT_INTERVAL_IN_MINUTES:90} +avni.scheduledJob.mediaAnalysis.repeatIntervalInMinutes=${AVNI_MEDIA_ANALYSIS_JOB_REPEAT_INTERVAL_IN_MINUTES:2} #S3 Parameters avni.bucket.name=${OPENCHS_BUCKET_NAME:dummy} From f2d8de105833c503819ad3c6f02a02388276d84e Mon Sep 17 00:00:00 2001 From: himeshr Date: Tue, 16 Jul 2024 19:11:47 +0530 Subject: [PATCH 02/14] #102 | Introduce function to fetch List objects in org bucket with path prefix and filter --- build.gradle | 1 + .../etl/config/AmazonClientService.java | 35 +++++++++++++++++++ .../etl/domain/OrganisationIdentity.java | 17 ++++++--- .../repository/OrganisationRepository.java | 2 +- .../OrganisationIdentityRowMapper.java | 2 +- .../etl/service/MediaAnalysisService.java | 21 ++++++++--- 6 files changed, 67 insertions(+), 11 deletions(-) diff --git a/build.gradle b/build.gradle index b3799d2..5304e32 100644 --- a/build.gradle +++ b/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-quartz' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-security' + implementation 'org.springframework.boot:spring-boot-starter-test' runtimeOnly 'org.postgresql:postgresql' testImplementation 'org.springframework.boot:spring-boot-starter-test' implementation "log4j:log4j:1.2.17" diff --git a/src/main/java/org/avniproject/etl/config/AmazonClientService.java b/src/main/java/org/avniproject/etl/config/AmazonClientService.java index 80e7d26..4f1626e 100644 --- a/src/main/java/org/avniproject/etl/config/AmazonClientService.java +++ b/src/main/java/org/avniproject/etl/config/AmazonClientService.java @@ -8,19 +8,27 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.AmazonS3URI; import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import java.net.URL; import java.time.Duration; +import java.util.ArrayList; import java.util.Date; +import java.util.function.Predicate; +import java.util.regex.Pattern; @Service @ConditionalOnProperty(value = "aws.s3.enable", havingValue = "true", matchIfMissing = true) public class AmazonClientService { + public static final int MAX_KEYS = 1000; @Value("${avni.bucket.name}") String bucketName; @@ -65,4 +73,31 @@ private Date getExpireDate(long expireDuration) { expiration.setTime(expiration.getTime() + expireDuration); return expiration; } + + public ArrayList listObjectsInBucket(String s3PathPrefix, String negativeFilterPatternString) { + Boolean isNegativePatternFilteringRequired = StringUtils.hasText(negativeFilterPatternString); + Pattern negativeFilterPattern = Pattern.compile(negativeFilterPatternString); + Predicate negativeFilterPatternPredicate = negativeFilterPattern.asPredicate(); + Pattern uuidRegexPattern = Pattern.compile("^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"); + Predicate uuidRegexPatternPredicate = uuidRegexPattern.asPredicate(); + + ArrayList listOfMediaUrls = new ArrayList<>(); + ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucketName).withPrefix(s3PathPrefix).withMaxKeys(MAX_KEYS); + ListObjectsV2Result result; + do { + result = s3Client.listObjectsV2(req); + for (S3ObjectSummary objectSummary : result.getObjectSummaries()) { +// System.out.printf(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize()); + if (uuidRegexPatternPredicate.test(objectSummary.getKey().substring(objectSummary.getKey().lastIndexOf("/") + 1)) + && !(isNegativePatternFilteringRequired && negativeFilterPatternPredicate.test(objectSummary.getKey()))) { + listOfMediaUrls.add(objectSummary.getKey()); + } + } + // If there are more than maxKeys keys in the bucket, get a continuation token + // and list the next objects. + String token = result.getNextContinuationToken(); + req.setContinuationToken(token); + } while (result.isTruncated()); + return listOfMediaUrls; + } } diff --git a/src/main/java/org/avniproject/etl/domain/OrganisationIdentity.java b/src/main/java/org/avniproject/etl/domain/OrganisationIdentity.java index dd164b7..f973599 100644 --- a/src/main/java/org/avniproject/etl/domain/OrganisationIdentity.java +++ b/src/main/java/org/avniproject/etl/domain/OrganisationIdentity.java @@ -9,30 +9,33 @@ public class OrganisationIdentity { private final String dbUser; private final String schemaName; private final String schemaUser; + private final String mediaDirectory; private List orgGroupOrgDbUsers; private Date startTime; - private OrganisationIdentity(String dbUser, String schemaName, String schemaUser) { + private OrganisationIdentity(String dbUser, String schemaName, String schemaUser, String mediaDirectory) { this.dbUser = dbUser; this.schemaName = schemaName; this.schemaUser = schemaUser; + this.mediaDirectory = mediaDirectory; } /** * @param dbUser The database user who has access the source data of the organisation and is also a schema user * @param schemaName The destination schema name to be created/updated */ - public static OrganisationIdentity createForOrganisation(String dbUser, String schemaName) { - return new OrganisationIdentity(dbUser, schemaName, dbUser); + public static OrganisationIdentity createForOrganisation(String dbUser, String schemaName, String mediaDirectory) { + return new OrganisationIdentity(dbUser, schemaName, dbUser, mediaDirectory); } + /** * @param dbUser The database user who has access the source data of the organisation * @param schemaName The destination schema name to be created/updated * @param schemaUser The destination user who will have access to the schema */ public static OrganisationIdentity createForOrganisationGroup(String dbUser, String schemaName, String schemaUser) { - return new OrganisationIdentity(dbUser, schemaName, schemaUser); + return new OrganisationIdentity(dbUser, schemaName, schemaUser, null); } public String getSchemaName() { @@ -41,7 +44,7 @@ public String getSchemaName() { @Override public String toString() { - return String.format("Schema: %s, DB User: %s, Schema User: %s", schemaName, dbUser, schemaUser); + return String.format("Schema: %s, DB User: %s, Schema User: %s, MediaDirectory: %s", schemaName, dbUser, schemaUser, mediaDirectory); } public String getDbUser() { @@ -71,4 +74,8 @@ public Date getStartTime() { public void setStartTime(Date startTime) { this.startTime = startTime; } + + public String getMediaDirectory() { + return mediaDirectory; + } } diff --git a/src/main/java/org/avniproject/etl/repository/OrganisationRepository.java b/src/main/java/org/avniproject/etl/repository/OrganisationRepository.java index 1e96408..a97045d 100644 --- a/src/main/java/org/avniproject/etl/repository/OrganisationRepository.java +++ b/src/main/java/org/avniproject/etl/repository/OrganisationRepository.java @@ -20,7 +20,7 @@ public OrganisationRepository(JdbcTemplate jdbcTemplate) { } public OrganisationIdentity getOrganisation(String organisationUUID) { - String query = "select db_user, schema_name\n" + + String query = "select db_user, schema_name, media_directory\n" + "from organisation where uuid = ? and is_voided = false"; List organisationIdentities = jdbcTemplate.query(query, ps -> ps.setString(1, organisationUUID), new OrganisationIdentityRowMapper()); if (organisationIdentities.size() == 0) return null; diff --git a/src/main/java/org/avniproject/etl/repository/rowMappers/OrganisationIdentityRowMapper.java b/src/main/java/org/avniproject/etl/repository/rowMappers/OrganisationIdentityRowMapper.java index 4bcc9e8..6aaeac3 100644 --- a/src/main/java/org/avniproject/etl/repository/rowMappers/OrganisationIdentityRowMapper.java +++ b/src/main/java/org/avniproject/etl/repository/rowMappers/OrganisationIdentityRowMapper.java @@ -10,6 +10,6 @@ public class OrganisationIdentityRowMapper implements RowMapper listOfAllMediaUrls = amazonClientService.listObjectsInBucket(getMediaDirectory(organisation), "thumbnails"); + ArrayList listOfAllMediaUrlsIncludingThumbnails = amazonClientService.listObjectsInBucket(getMediaDirectory(organisation), ""); + ArrayList listOfAllThumbnailsUrls = amazonClientService.listObjectsInBucket(getThumbnailsDirectory(organisation), ""); + log.info(String.format("listOfAllMediaUrls %d listOfAllMediaUrlsIncludingThumbnails %d listOfAllThumbnailsUrls %d", listOfAllMediaUrls.size(), listOfAllMediaUrlsIncludingThumbnails.size(), listOfAllThumbnailsUrls.size())); log.info(String.format("Completed Media Analysis for schema %s with dbUser %s and schemaUser %s", organisationIdentity.getSchemaName(), organisationIdentity.getDbUser(), organisationIdentity.getSchemaUser())); OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig); } + + private String getThumbnailsDirectory(Organisation organisation) { + return getMediaDirectory(organisation) + "/thumbnails"; + } + + private String getMediaDirectory(Organisation organisation) { + return organisation.getOrganisationIdentity().getMediaDirectory(); + } } From 6a03e374677911d69ed0f357aa3e32744ac661e0 Mon Sep 17 00:00:00 2001 From: himeshr Date: Wed, 17 Jul 2024 11:19:39 +0530 Subject: [PATCH 03/14] #102 | [WIP] Add TODOs before putting on hold --- .../etl/config/AmazonClientService.java | 14 ++------------ .../backgroundJob/EtlJobController.java | 1 + .../etl/service/MediaAnalysisService.java | 19 ++++++++++++++----- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/avniproject/etl/config/AmazonClientService.java b/src/main/java/org/avniproject/etl/config/AmazonClientService.java index 4f1626e..e51efe2 100644 --- a/src/main/java/org/avniproject/etl/config/AmazonClientService.java +++ b/src/main/java/org/avniproject/etl/config/AmazonClientService.java @@ -74,24 +74,14 @@ private Date getExpireDate(long expireDuration) { return expiration; } - public ArrayList listObjectsInBucket(String s3PathPrefix, String negativeFilterPatternString) { - Boolean isNegativePatternFilteringRequired = StringUtils.hasText(negativeFilterPatternString); - Pattern negativeFilterPattern = Pattern.compile(negativeFilterPatternString); - Predicate negativeFilterPatternPredicate = negativeFilterPattern.asPredicate(); - Pattern uuidRegexPattern = Pattern.compile("^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"); - Predicate uuidRegexPatternPredicate = uuidRegexPattern.asPredicate(); - + public ArrayList listObjectsInBucket(String s3PathPrefix) { ArrayList listOfMediaUrls = new ArrayList<>(); ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucketName).withPrefix(s3PathPrefix).withMaxKeys(MAX_KEYS); ListObjectsV2Result result; do { result = s3Client.listObjectsV2(req); for (S3ObjectSummary objectSummary : result.getObjectSummaries()) { -// System.out.printf(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize()); - if (uuidRegexPatternPredicate.test(objectSummary.getKey().substring(objectSummary.getKey().lastIndexOf("/") + 1)) - && !(isNegativePatternFilteringRequired && negativeFilterPatternPredicate.test(objectSummary.getKey()))) { - listOfMediaUrls.add(objectSummary.getKey()); - } + listOfMediaUrls.add(objectSummary.getKey()); } // If there are more than maxKeys keys in the bucket, get a continuation token // and list the next objects. diff --git a/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java b/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java index 284c05e..78ff80a 100644 --- a/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java +++ b/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java @@ -76,6 +76,7 @@ public ResponseEntity createJob(@RequestBody JobScheduleRequest jobScheduleReque EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup()); if (latestJobRun != null) return ResponseEntity.badRequest().body("Job already present"); + //TODO Add validation for !jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) to check that JobGroup.Sync is already scheduled JobDetailImpl jobDetail = getJobDetail(jobScheduleRequest, organisationIdentity, organisationIdentitiesInGroup); scheduler.addJob(jobDetail, false); diff --git a/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java b/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java index d703b34..25cb593 100644 --- a/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java +++ b/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java @@ -7,9 +7,11 @@ import org.avniproject.etl.domain.Organisation; import org.avniproject.etl.domain.OrganisationIdentity; import org.avniproject.etl.repository.OrganisationRepository; +import org.glassfish.jaxb.core.v2.TODO; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.swing.text.html.HTML; import java.util.ArrayList; import java.util.List; @@ -51,11 +53,18 @@ public void runFor(OrganisationIdentity organisationIdentity) { log.info(String.format("Running Media Analysis for %s", organisationIdentity.toString())); OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig); Organisation organisation = organisationFactory.create(organisationIdentity); -// TODO - ArrayList listOfAllMediaUrls = amazonClientService.listObjectsInBucket(getMediaDirectory(organisation), "thumbnails"); - ArrayList listOfAllMediaUrlsIncludingThumbnails = amazonClientService.listObjectsInBucket(getMediaDirectory(organisation), ""); - ArrayList listOfAllThumbnailsUrls = amazonClientService.listObjectsInBucket(getThumbnailsDirectory(organisation), ""); - log.info(String.format("listOfAllMediaUrls %d listOfAllMediaUrlsIncludingThumbnails %d listOfAllThumbnailsUrls %d", listOfAllMediaUrls.size(), listOfAllMediaUrlsIncludingThumbnails.size(), listOfAllThumbnailsUrls.size())); + ArrayList listOfAllMediaUrls = amazonClientService.listObjectsInBucket(getMediaDirectory(organisation)); + //TODO Fix test issues causing build break + //TODO Make use of listOfAllMediaUrls to come up with required subset of URLs, like thumbnails, media after validating UUID and excluding Mobile and Adhoc entries + //TODO Log entries that get filtered out for dev purposes + ArrayList listOfAllMediaUrlsExcludingThumbnails = new ArrayList<>(); + ArrayList listOfAllThumbnailsUrls = new ArrayList<>(); + // TODO: 17/07/24 Fetch list of MediaUrls from media table + // SELECT REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', '') as image_url_in_media_table + // FROM goonj.media + // ORDER BY REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', ''); + // TODO: 17/07/24 Invoke Analysis method to perform various metrics computations for each entry in media table of the org + log.info(String.format("listOfAllMediaUrls %d listOfAllMediaUrlsExcludingThumbnails %d listOfAllThumbnailsUrls %d", listOfAllMediaUrls.size(), listOfAllMediaUrlsExcludingThumbnails.size(), listOfAllThumbnailsUrls.size())); log.info(String.format("Completed Media Analysis for schema %s with dbUser %s and schemaUser %s", organisationIdentity.getSchemaName(), organisationIdentity.getDbUser(), organisationIdentity.getSchemaUser())); OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig); } From 8084e4b6987f785ada6f8f3dfd83ffe1c1b3474c Mon Sep 17 00:00:00 2001 From: himeshr Date: Wed, 17 Jul 2024 12:32:27 +0530 Subject: [PATCH 04/14] #102 | Fix Build break --- .../avniproject/etl/DataSyncIntegrationTest.java | 14 +++++++------- .../etl/builder/OrganisationIdentityBuilder.java | 8 +++++++- .../avniproject/etl/domain/TableMetadataTest.java | 2 +- .../etl/domain/metadata/ColumnMetadataTest.java | 2 +- .../etl/domain/metadata/diff/AddColumnTest.java | 4 ++-- .../etl/domain/metadata/diff/CreateTableTest.java | 4 ++-- .../etl/domain/metadata/diff/RenameColumnTest.java | 2 +- .../etl/domain/metadata/diff/RenameTableTest.java | 2 +- .../etl/repository/AvniMetadataRepositoryTest.java | 6 +++--- .../repository/SchemaMetadataRepositoryTest.java | 2 +- .../etl/service/EtlServiceIntegrationTest.java | 4 ++-- 11 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/test/java/org/avniproject/etl/DataSyncIntegrationTest.java b/src/test/java/org/avniproject/etl/DataSyncIntegrationTest.java index 6098a74..7aa8465 100644 --- a/src/test/java/org/avniproject/etl/DataSyncIntegrationTest.java +++ b/src/test/java/org/avniproject/etl/DataSyncIntegrationTest.java @@ -37,7 +37,7 @@ private String getCurrentTime(long subtractSeconds) { } private void runDataSync() { - etlService.runFor(OrganisationIdentity.createForOrganisation("orgc", "orgc")); + etlService.runFor(OrganisationIdentity.createForOrganisation("orgc", "orgc", "orgc")); } private List> getPersons() { @@ -213,10 +213,10 @@ public void allTheDBUserOfOrgGroupAreAbleToQueryTables() { etlService.runForOrganisationGroup("og"); etlService.runForOrganisationGroup("og"); - etlService.runFor(OrganisationIdentity.createForOrganisation("ogi1", "ogi1")); + etlService.runFor(OrganisationIdentity.createForOrganisation("ogi1", "ogi1", "ogi1")); etlService.runForOrganisationGroup("og"); - etlService.runFor(OrganisationIdentity.createForOrganisation("ogi1", "ogi1")); + etlService.runFor(OrganisationIdentity.createForOrganisation("ogi1", "ogi1", "ogi1")); jdbcTemplate.execute("set role og;"); List> groupList = jdbcTemplate.queryForList("select * from og.person;"); @@ -236,13 +236,13 @@ public void allTheDBUserOfOrgGroupAreAbleToQueryTables() { @Sql(scripts = {"/test-data-teardown.sql", "/organisation-group-teardown.sql"}, executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) public void multipleRunsShouldNotCauseDuplicateDataInOrganisationsAndGroups() { - etlService.runFor(OrganisationIdentity.createForOrganisation("ogi2", "ogi2")); + etlService.runFor(OrganisationIdentity.createForOrganisation("ogi2", "ogi2", "ogi2")); etlService.runForOrganisationGroup("og"); - etlService.runFor(OrganisationIdentity.createForOrganisation("ogi1", "ogi1")); + etlService.runFor(OrganisationIdentity.createForOrganisation("ogi1", "ogi1", "ogi2")); - etlService.runFor(OrganisationIdentity.createForOrganisation("ogi2", "ogi2")); + etlService.runFor(OrganisationIdentity.createForOrganisation("ogi2", "ogi2", "ogi2")); etlService.runForOrganisationGroup("og"); - etlService.runFor(OrganisationIdentity.createForOrganisation("ogi1", "ogi1")); + etlService.runFor(OrganisationIdentity.createForOrganisation("ogi1", "ogi1", "ogi2")); jdbcTemplate.execute("set role og;"); List> groupList = jdbcTemplate.queryForList("select * from og.person;"); diff --git a/src/test/java/org/avniproject/etl/builder/OrganisationIdentityBuilder.java b/src/test/java/org/avniproject/etl/builder/OrganisationIdentityBuilder.java index c0dd086..c188a5f 100644 --- a/src/test/java/org/avniproject/etl/builder/OrganisationIdentityBuilder.java +++ b/src/test/java/org/avniproject/etl/builder/OrganisationIdentityBuilder.java @@ -6,6 +6,7 @@ public class OrganisationIdentityBuilder { private Integer id = 1; private String dbUser = "dbUser"; private String schemaName = "schema"; + private String mediaDirectory = "mediaDirectory"; public OrganisationIdentityBuilder withId(Integer id) { this.id = id; @@ -13,7 +14,7 @@ public OrganisationIdentityBuilder withId(Integer id) { } public OrganisationIdentity build() { - return OrganisationIdentity.createForOrganisation(dbUser, schemaName); + return OrganisationIdentity.createForOrganisation(dbUser, schemaName, mediaDirectory); } public OrganisationIdentityBuilder withDbUser(String dbUser) { @@ -25,4 +26,9 @@ public OrganisationIdentityBuilder withSchemaName(String schemaName) { this.schemaName = schemaName; return this; } + + public OrganisationIdentityBuilder withMediaDirectory(String mediaDirectory) { + this.mediaDirectory = mediaDirectory; + return this; + } } diff --git a/src/test/java/org/avniproject/etl/domain/TableMetadataTest.java b/src/test/java/org/avniproject/etl/domain/TableMetadataTest.java index d0114b8..dbe662a 100644 --- a/src/test/java/org/avniproject/etl/domain/TableMetadataTest.java +++ b/src/test/java/org/avniproject/etl/domain/TableMetadataTest.java @@ -74,7 +74,7 @@ public void shouldRenameTableIfNecessary() { @Test public void shouldAddColumnIfMissing() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema", "mediaDirectory")); TableMetadata oldTable = new TableMetadataBuilder().forPerson().build(); TableMetadata newTable = new TableMetadataBuilder().forPerson().build(); newTable.addColumnMetadata(List.of(new ColumnMetadata(new Column("newColumn", Column.Type.text), 24, ColumnMetadata.ConceptType.Text, UUID.randomUUID().toString()))); diff --git a/src/test/java/org/avniproject/etl/domain/metadata/ColumnMetadataTest.java b/src/test/java/org/avniproject/etl/domain/metadata/ColumnMetadataTest.java index 40c1407..64779cc 100644 --- a/src/test/java/org/avniproject/etl/domain/metadata/ColumnMetadataTest.java +++ b/src/test/java/org/avniproject/etl/domain/metadata/ColumnMetadataTest.java @@ -17,7 +17,7 @@ class ColumnMetadataTest { @Test public void shouldRenameColumnIfNecessary() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema", "mediaDirectory")); String uuid = UUID.randomUUID().toString(); ColumnMetadata oldColumnMetadata = new ColumnMetadata(new Column("oldName", Column.Type.text), 12, ColumnMetadata.ConceptType.Text, uuid); ColumnMetadata newColumnMetadata = new ColumnMetadata(new Column("newName", Column.Type.text), 12, ColumnMetadata.ConceptType.Text, uuid); diff --git a/src/test/java/org/avniproject/etl/domain/metadata/diff/AddColumnTest.java b/src/test/java/org/avniproject/etl/domain/metadata/diff/AddColumnTest.java index bd39f76..dfa6709 100644 --- a/src/test/java/org/avniproject/etl/domain/metadata/diff/AddColumnTest.java +++ b/src/test/java/org/avniproject/etl/domain/metadata/diff/AddColumnTest.java @@ -13,7 +13,7 @@ public class AddColumnTest { @Test public void shouldAddColumn() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema", "mediaDirectory")); AddColumn addColumn = new AddColumn("table", new Column("name", Column.Type.text)); assertThat(addColumn.getSql(), is("alter table \"schema\".table add column \"name\" text;")); } @@ -30,7 +30,7 @@ public void checkNameLengthBeforeShortening() { public void shouldAddColumnWithLargeNameAfterShortening() { String columnName = "Total silt requested by the family members – Number of trolleys"; String shortenedColumnName = "Total silt requested by the family members – Nu (1206887472)"; - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema", "mediaDirectory")); AddColumn addColumn = new AddColumn("table", new Column(columnName, Column.Type.text)); assertThat(addColumn.getSql(), is("alter table \"schema\".table add column \""+shortenedColumnName+"\" text;")); } diff --git a/src/test/java/org/avniproject/etl/domain/metadata/diff/CreateTableTest.java b/src/test/java/org/avniproject/etl/domain/metadata/diff/CreateTableTest.java index b2d34dc..e6a4f63 100644 --- a/src/test/java/org/avniproject/etl/domain/metadata/diff/CreateTableTest.java +++ b/src/test/java/org/avniproject/etl/domain/metadata/diff/CreateTableTest.java @@ -14,12 +14,12 @@ public class CreateTableTest { @BeforeEach public void before() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema", "mediaDirectory")); } @Test public void shouldCreateSqlWithCommonColumns() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema", "mediaDirectory")); TableMetadata tableMetadata = new TableMetadata(); tableMetadata.setName("tableName"); diff --git a/src/test/java/org/avniproject/etl/domain/metadata/diff/RenameColumnTest.java b/src/test/java/org/avniproject/etl/domain/metadata/diff/RenameColumnTest.java index b97f0df..72805e4 100644 --- a/src/test/java/org/avniproject/etl/domain/metadata/diff/RenameColumnTest.java +++ b/src/test/java/org/avniproject/etl/domain/metadata/diff/RenameColumnTest.java @@ -11,7 +11,7 @@ class RenameColumnTest { @Test public void shouldCreateSqlForRenamingColumn() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema", "mediaDirectory")); RenameColumn renameColumn = new RenameColumn("table_name", "oldName", "newName"); System.out.println(renameColumn.getSql()); assertThat(renameColumn.getSql(), is("alter table \"schema\".table_name rename column \"oldName\" to \"newName\";")); diff --git a/src/test/java/org/avniproject/etl/domain/metadata/diff/RenameTableTest.java b/src/test/java/org/avniproject/etl/domain/metadata/diff/RenameTableTest.java index 55fb18b..d6dc0f3 100644 --- a/src/test/java/org/avniproject/etl/domain/metadata/diff/RenameTableTest.java +++ b/src/test/java/org/avniproject/etl/domain/metadata/diff/RenameTableTest.java @@ -11,7 +11,7 @@ public class RenameTableTest { @Test public void shouldRenameTable() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("dbUser", "schema", "mediaDirectory")); RenameTable renameTable = new RenameTable("old_name", "new_name"); assertThat(renameTable.getSql(), is("alter table \"schema\".old_name rename to new_name;")); } diff --git a/src/test/java/org/avniproject/etl/repository/AvniMetadataRepositoryTest.java b/src/test/java/org/avniproject/etl/repository/AvniMetadataRepositoryTest.java index d20a94b..6c52b2f 100644 --- a/src/test/java/org/avniproject/etl/repository/AvniMetadataRepositoryTest.java +++ b/src/test/java/org/avniproject/etl/repository/AvniMetadataRepositoryTest.java @@ -22,7 +22,7 @@ public class AvniMetadataRepositoryTest extends BaseIntegrationTest { @Sql({"/test-data-teardown.sql", "/test-data.sql"}) @Sql(scripts = {"/test-data-teardown.sql"}, executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) public void shouldGetConceptName() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("orgc", "orgc")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("orgc", "orgc", "orgc")); String conceptName = avniMetadataRepository.conceptName("f005ccf7-f714-4615-a2a0-26efa2da6491"); assertThat(conceptName, is("Numeric Question")); } @@ -31,7 +31,7 @@ public void shouldGetConceptName() { @Sql({"/test-data-teardown.sql", "/test-data.sql"}) @Sql(scripts = {"/test-data-teardown.sql"}, executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) public void shouldLookForConceptNameWithinOrganisation() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("orgb", "orgb")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("orgb", "orgb", "orgb")); assertThrows(Exception.class, () -> avniMetadataRepository.conceptName("f005ccf7-f714-4615-a2a0-26efa2da6491")); } @@ -39,7 +39,7 @@ public void shouldLookForConceptNameWithinOrganisation() { @Sql({"/test-data-teardown.sql", "/test-data.sql"}) @Sql(scripts = {"/test-data-teardown.sql"}, executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) public void shouldRetrieveSyncRegistrationConcepts() { - OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("orgc", "orgc")); + OrgIdentityContextHolder.setContext(OrganisationIdentity.createForOrganisation("orgc", "orgc", "orgc")); SyncRegistrationConcept[] syncRegistrationConcepts = avniMetadataRepository.findSyncRegistrationConcepts("a95d8951-17e4-408d-98b0-ef3a6c982b96"); SyncRegistrationConcept concept1 = syncRegistrationConcepts[0]; assertThat(concept1.getUuid(), is("701b68df-dc52-4d69-ab91-f03a70ac1bbc")); diff --git a/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java b/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java index 3e32aae..d62dc3c 100644 --- a/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java +++ b/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java @@ -24,7 +24,7 @@ public class SchemaMetadataRepositoryTest extends BaseIntegrationTest { @BeforeEach public void before() { - OrganisationIdentity orgb = OrganisationIdentity.createForOrganisation("orgc", "orgc"); + OrganisationIdentity orgb = OrganisationIdentity.createForOrganisation("orgc", "orgc", "orgc"); OrgIdentityContextHolder.setContext(orgb); } diff --git a/src/test/java/org/avniproject/etl/service/EtlServiceIntegrationTest.java b/src/test/java/org/avniproject/etl/service/EtlServiceIntegrationTest.java index 3f163fa..b40aaae 100644 --- a/src/test/java/org/avniproject/etl/service/EtlServiceIntegrationTest.java +++ b/src/test/java/org/avniproject/etl/service/EtlServiceIntegrationTest.java @@ -22,8 +22,8 @@ public class EtlServiceIntegrationTest extends BaseIntegrationTest { @Sql(scripts = {"/test-data-teardown.sql"}, executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) public void shouldNotFailWhenRunTwice() { - etlService.runFor(OrganisationIdentity.createForOrganisation("orgc", "orgc")); - etlService.runFor(OrganisationIdentity.createForOrganisation("orgc", "orgc")); + etlService.runFor(OrganisationIdentity.createForOrganisation("orgc", "orgc", "orgc")); + etlService.runFor(OrganisationIdentity.createForOrganisation("orgc", "orgc", "orgc")); assertThat(countOfRowsIn("orgc.goat"), equalTo(1L)); assertThat(countOfRowsIn("orgc.household"), equalTo(1L)); From fbb927c772cb364c2271e56d328f6cfcf5b09195 Mon Sep 17 00:00:00 2001 From: himeshr Date: Fri, 19 Jul 2024 15:14:20 +0530 Subject: [PATCH 05/14] #102 | Fail triggering MediaAnalysis job, if sync job isn't running - Consolidate all Create Job requests validation - Truncate jobDetail description --- .../backgroundJob/EtlJobController.java | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java b/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java index c109a72..ab34577 100644 --- a/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java +++ b/src/main/java/org/avniproject/etl/controller/backgroundJob/EtlJobController.java @@ -16,6 +16,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import java.util.ArrayList; @@ -71,12 +72,8 @@ public ResponseEntity createJob(@RequestBody JobScheduleRequest jobScheduleReque else organisationIdentitiesInGroup = organisationRepository.getOrganisationGroup(jobScheduleRequest.getEntityUUID()); - if (organisationIdentity == null && organisationIdentitiesInGroup.size() == 0) - return ResponseEntity.badRequest().body(String.format("No such organisation or group exists: %s", jobScheduleRequest.getEntityUUID())); - - EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup()); - if (latestJobRun != null) return ResponseEntity.badRequest().body("Job already present"); - //TODO Add validation for !jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) to check that JobGroup.Sync is already scheduled + ResponseEntity jobScheduleValidationResult = validateRequest(jobScheduleRequest, organisationIdentity, organisationIdentitiesInGroup); + if (jobScheduleValidationResult != null) return jobScheduleValidationResult; JobDetailImpl jobDetail = getJobDetail(jobScheduleRequest, organisationIdentity, organisationIdentitiesInGroup); scheduler.addJob(jobDetail, false); @@ -86,6 +83,20 @@ public ResponseEntity createJob(@RequestBody JobScheduleRequest jobScheduleReque return ResponseEntity.ok().body("Job Scheduled!"); } + private ResponseEntity validateRequest(JobScheduleRequest jobScheduleRequest, OrganisationIdentity organisationIdentity, List organisationIdentitiesInGroup) throws SchedulerException { + if (organisationIdentity == null && organisationIdentitiesInGroup.size() == 0) { + return ResponseEntity.badRequest().body(String.format("No such organisation or group exists: %s", jobScheduleRequest.getEntityUUID())); + } + EtlJobSummary latestJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup()); + if (latestJobRun != null) return ResponseEntity.badRequest().body("Job already present"); + if (!jobScheduleRequest.getJobGroup().equals(JobGroup.Sync)) { + EtlJobSummary correspondingSyncJobRun = scheduledJobService.getLatestJobRun(jobScheduleRequest.getEntityUUID(), JobGroup.Sync); + if (correspondingSyncJobRun == null) + return ResponseEntity.badRequest().body("Sync Job has not been triggered for this Org / OrgGroup"); + } + return null; + } + private Trigger getTrigger(JobScheduleRequest jobScheduleRequest, JobDetailImpl jobDetail) { SimpleScheduleBuilder scheduleBuilder = simpleSchedule().withIntervalInMinutes(jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) ? scheduledJobConfig.getSyncRepeatIntervalInMinutes() : scheduledJobConfig.getMediaAnalysisRepeatIntervalInMinutes()).repeatForever(); @@ -98,7 +109,8 @@ private JobDetailImpl getJobDetail(JobScheduleRequest jobScheduleRequest, Organi jobDetail.setJobClass(jobScheduleRequest.getJobGroup().equals(JobGroup.Sync) ? EtlJob.class : MediaAnalysisJob.class); jobDetail.setDurability(true); jobDetail.setKey(scheduledJobConfig.getJobKey(jobScheduleRequest.getEntityUUID(), jobScheduleRequest.getJobGroup())); - jobDetail.setDescription(organisationIdentity == null ? organisationIdentitiesInGroup.stream().map(OrganisationIdentity::getSchemaName).collect(Collectors.joining(";")) : organisationIdentity.getSchemaName()); + String truncatedDescription = getTruncatedDescription(organisationIdentity, organisationIdentitiesInGroup); + jobDetail.setDescription(truncatedDescription); jobDetail.setGroup(jobScheduleRequest.getJobGroup().getGroupName()); jobDetail.setName(jobScheduleRequest.getEntityUUID()); JobDataMap jobDataMap = scheduledJobConfig.createJobData(jobScheduleRequest.getJobEntityType()); @@ -106,6 +118,12 @@ private JobDetailImpl getJobDetail(JobScheduleRequest jobScheduleRequest, Organi return jobDetail; } + private String getTruncatedDescription(OrganisationIdentity organisationIdentity, List organisationIdentitiesInGroup) { + String orgGroupSchemaNames = organisationIdentitiesInGroup.stream().map(OrganisationIdentity::getSchemaName).collect(Collectors.joining(";")); + String description = organisationIdentity == null ? "OrgGroup Schema names: " + orgGroupSchemaNames : organisationIdentity.toString(); + return StringUtils.truncate(description, 240); + } + @PreAuthorize("hasAnyAuthority('admin')") @DeleteMapping(value = "/job/{id}") public String deleteJob(@PathVariable String id, @RequestParam(value="jobGroup", required = false) JobGroup jobGroup) throws SchedulerException { From a7bc035ccd31d566725ade0d1e0c95a37626ebe1 Mon Sep 17 00:00:00 2001 From: himeshr Date: Fri, 19 Jul 2024 15:16:08 +0530 Subject: [PATCH 06/14] #102 | Partition MediaURLs, into thumbnails and media after validating for UUID and excluding Mobile and Adhoc entries --- .../etl/service/MediaAnalysisService.java | 43 +++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java b/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java index 25cb593..e3f2e54 100644 --- a/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java +++ b/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java @@ -7,16 +7,20 @@ import org.avniproject.etl.domain.Organisation; import org.avniproject.etl.domain.OrganisationIdentity; import org.avniproject.etl.repository.OrganisationRepository; -import org.glassfish.jaxb.core.v2.TODO; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.swing.text.html.HTML; -import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; @Service public class MediaAnalysisService { + public static final String THUMBNAILS_PATTERN = "thumbnails"; + public static final String ADHOC_MOBILE_DB_BACKUP_PATTERN = "Adhoc|MobileDbBackup"; + public static final String UUID_V4_PATTERN = "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"; private final OrganisationRepository organisationRepository; private final OrganisationFactory organisationFactory; private final SchemaMigrationService schemaMigrationService; @@ -53,24 +57,39 @@ public void runFor(OrganisationIdentity organisationIdentity) { log.info(String.format("Running Media Analysis for %s", organisationIdentity.toString())); OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig); Organisation organisation = organisationFactory.create(organisationIdentity); - ArrayList listOfAllMediaUrls = amazonClientService.listObjectsInBucket(getMediaDirectory(organisation)); - //TODO Fix test issues causing build break - //TODO Make use of listOfAllMediaUrls to come up with required subset of URLs, like thumbnails, media after validating UUID and excluding Mobile and Adhoc entries + + List listOfAllMediaUrls = fetchValidMediaUrlsFromStorage(organisation); + Map> partitionResults = partitionListBasedOnThumbnailsPattern(listOfAllMediaUrls); + List listOfAllThumbnailsUrls = partitionResults.get(Boolean.TRUE); + List listOfAllMediaUrlsExcludingThumbnails = partitionResults.get(Boolean.FALSE); + log.info(String.format("listOfAllMediaUrls %d listOfAllMediaUrlsExcludingThumbnails %d listOfAllThumbnailsUrls %d", listOfAllMediaUrls.size(), listOfAllMediaUrlsExcludingThumbnails.size(), listOfAllThumbnailsUrls.size())); + //TODO Log entries that get filtered out for dev purposes - ArrayList listOfAllMediaUrlsExcludingThumbnails = new ArrayList<>(); - ArrayList listOfAllThumbnailsUrls = new ArrayList<>(); // TODO: 17/07/24 Fetch list of MediaUrls from media table // SELECT REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', '') as image_url_in_media_table // FROM goonj.media // ORDER BY REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', ''); // TODO: 17/07/24 Invoke Analysis method to perform various metrics computations for each entry in media table of the org - log.info(String.format("listOfAllMediaUrls %d listOfAllMediaUrlsExcludingThumbnails %d listOfAllThumbnailsUrls %d", listOfAllMediaUrls.size(), listOfAllMediaUrlsExcludingThumbnails.size(), listOfAllThumbnailsUrls.size())); + //TODO Fix test issues causing build break log.info(String.format("Completed Media Analysis for schema %s with dbUser %s and schemaUser %s", organisationIdentity.getSchemaName(), organisationIdentity.getDbUser(), organisationIdentity.getSchemaUser())); - OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig); } - private String getThumbnailsDirectory(Organisation organisation) { - return getMediaDirectory(organisation) + "/thumbnails"; + private List fetchValidMediaUrlsFromStorage(Organisation organisation) { + List listOfAllMediaUrls = amazonClientService.listObjectsInBucket(getMediaDirectory(organisation)); + filterOutNonMediaUrls(listOfAllMediaUrls); + return listOfAllMediaUrls; + } + + private void filterOutNonMediaUrls(List listOfAllMediaUrls) { + Predicate fastSyncAndAdhocDumpPatternPredicate = Pattern.compile(ADHOC_MOBILE_DB_BACKUP_PATTERN, Pattern.CASE_INSENSITIVE).asPredicate(); + Predicate notUUIDPatternPredicate = Pattern.compile(UUID_V4_PATTERN).asPredicate().negate(); + listOfAllMediaUrls.removeIf(fastSyncAndAdhocDumpPatternPredicate.or(notUUIDPatternPredicate)); + } + + private Map> partitionListBasedOnThumbnailsPattern(List listOfAllMediaUrls) { + Predicate thumbnailsPatternPredicate = Pattern.compile(THUMBNAILS_PATTERN, Pattern.CASE_INSENSITIVE).asPredicate(); + Map> partitionResults= listOfAllMediaUrls.stream().collect(Collectors.partitioningBy(thumbnailsPatternPredicate)); + return partitionResults; } private String getMediaDirectory(Organisation organisation) { From 2e01c57af486e562df88759412e36576d6f9e768 Mon Sep 17 00:00:00 2001 From: himeshr Date: Mon, 22 Jul 2024 19:21:23 +0530 Subject: [PATCH 07/14] #102 | Refactor MediaAnalysis flow --- .../etl/domain/metadata/SchemaMetadata.java | 8 + .../etl/domain/metadata/TableMetadata.java | 13 ++ .../avniproject/etl/dto/MediaAnalysisVO.java | 53 +++++++ .../etl/repository/MediaTableRepository.java | 8 +- .../repository/SchemaMetadataRepository.java | 7 +- .../MediaAnalysisTableMetadataBuilder.java | 19 +++ .../tableMappers/MediaAnalysisTable.java | 27 ++++ .../service/MediaTableRepositoryService.java | 22 ++- .../sql/MediaSearchQueryBuilder.java | 8 + .../MediaAnalysisTableRegenerateAction.java | 143 ++++++++++++++++++ .../etl/service/MediaAnalysisService.java | 66 ++------ .../avniproject/etl/service/SyncService.java | 3 +- src/main/resources/sql/api/searchMedia.sql.st | 2 + .../resources/sql/etl/mediaAnalysis.sql.st | 3 + .../SchemaMetadataRepositoryTest.java | 11 ++ 15 files changed, 325 insertions(+), 68 deletions(-) create mode 100644 src/main/java/org/avniproject/etl/dto/MediaAnalysisVO.java create mode 100644 src/main/java/org/avniproject/etl/repository/rowMappers/MediaAnalysisTableMetadataBuilder.java create mode 100644 src/main/java/org/avniproject/etl/repository/rowMappers/tableMappers/MediaAnalysisTable.java create mode 100644 src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java create mode 100644 src/main/resources/sql/etl/mediaAnalysis.sql.st diff --git a/src/main/java/org/avniproject/etl/domain/metadata/SchemaMetadata.java b/src/main/java/org/avniproject/etl/domain/metadata/SchemaMetadata.java index 1d563e2..cfd6438 100644 --- a/src/main/java/org/avniproject/etl/domain/metadata/SchemaMetadata.java +++ b/src/main/java/org/avniproject/etl/domain/metadata/SchemaMetadata.java @@ -97,6 +97,14 @@ public List getAllEncounterTableNames() { return encounterTableNames; } + public Optional getMediaTable() { + return tableMetadata.stream().filter(TableMetadata::isMediaTable).findFirst(); + } + + public Optional getMediaAnalysisTable() { + return tableMetadata.stream().filter(TableMetadata::isMediaAnalysisTable).findFirst(); + } + private List findChanges(SchemaMetadata currentSchema, TableMetadata newTable) { List diffs = new ArrayList<>(); Optional optionalMatchingTable = currentSchema.findMatchingTable(newTable); diff --git a/src/main/java/org/avniproject/etl/domain/metadata/TableMetadata.java b/src/main/java/org/avniproject/etl/domain/metadata/TableMetadata.java index e685644..b6e1fb4 100644 --- a/src/main/java/org/avniproject/etl/domain/metadata/TableMetadata.java +++ b/src/main/java/org/avniproject/etl/domain/metadata/TableMetadata.java @@ -247,6 +247,7 @@ public enum Type { IndividualEncounterCancellation, Address, Media, + MediaAnalysis, ManualProgramEnrolmentEligibility, GroupToMember, HouseholdToMember, @@ -273,6 +274,18 @@ public boolean isSubjectTable() { return Arrays.asList(Type.Individual, Type.Person, Type.Household, Type.Group).contains(this.type); } + public boolean isMediaTable() { + return (Type.Media).equals(this.type); + } + + public boolean isMediaAnalysisTable() { + return (Type.MediaAnalysis).equals(this.type); + } + + public boolean isPartOfRegularSync() { + return !isMediaAnalysisTable(); + } + private void addIndexMetadata(IndexMetadata indexMetadata) { this.indexMetadataList.add(indexMetadata); } diff --git a/src/main/java/org/avniproject/etl/dto/MediaAnalysisVO.java b/src/main/java/org/avniproject/etl/dto/MediaAnalysisVO.java new file mode 100644 index 0000000..7732432 --- /dev/null +++ b/src/main/java/org/avniproject/etl/dto/MediaAnalysisVO.java @@ -0,0 +1,53 @@ +package org.avniproject.etl.dto; + +import java.util.Objects; + +public class MediaAnalysisVO { + + String uuid; + String image_url; + boolean isValidUrl; + boolean isPresentInStorage; + boolean isThumbnailGenerated; + + public MediaAnalysisVO(String uuid, String image_url, boolean isValidUrl, boolean isPresentInStorage, boolean isThumbnailGenerated) { + this.uuid = uuid; + this.image_url = image_url; + this.isValidUrl = isValidUrl; + this.isPresentInStorage = isPresentInStorage; + this.isThumbnailGenerated = isThumbnailGenerated; + } + + public String getUuid() { + return uuid; + } + + public String getImage_url() { + return image_url; + } + + public boolean isValidUrl() { + return isValidUrl; + } + + public boolean isPresentInStorage() { + return isPresentInStorage; + } + + public boolean isThumbnailGenerated() { + return isThumbnailGenerated; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof MediaAnalysisVO)) return false; + MediaAnalysisVO that = (MediaAnalysisVO) o; + return uuid.equals(that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(uuid); + } +} diff --git a/src/main/java/org/avniproject/etl/repository/MediaTableRepository.java b/src/main/java/org/avniproject/etl/repository/MediaTableRepository.java index 2d119a8..77b6274 100644 --- a/src/main/java/org/avniproject/etl/repository/MediaTableRepository.java +++ b/src/main/java/org/avniproject/etl/repository/MediaTableRepository.java @@ -84,9 +84,13 @@ private List searchInternal(MediaSearchRequest mediaSearchRequest, Page p .query(query.sql(), query.parameters(), rowMapper), jdbcTemplate); } - - public List getImageData(MediaSearchRequest mediaSearchRequest, Page page) { return searchInternal(mediaSearchRequest, page, (rs, rowNum) -> mediaTableRepositoryService.setImageData(rs)); } + + public List getAllMedia() { + Query query = new MediaSearchQueryBuilder().allWithoutAnyLimitOrOffset().build(); + return runInSchemaUserContext(() -> new NamedParameterJdbcTemplate(jdbcTemplate) + .query(query.sql(), query.parameters(), (rs, rowNum) -> mediaTableRepositoryService.setMediaDto(rs, false)), jdbcTemplate); + } } diff --git a/src/main/java/org/avniproject/etl/repository/SchemaMetadataRepository.java b/src/main/java/org/avniproject/etl/repository/SchemaMetadataRepository.java index 9427615..da5a2f4 100644 --- a/src/main/java/org/avniproject/etl/repository/SchemaMetadataRepository.java +++ b/src/main/java/org/avniproject/etl/repository/SchemaMetadataRepository.java @@ -6,11 +6,7 @@ import org.avniproject.etl.domain.metadata.SchemaMetadata; import org.avniproject.etl.domain.metadata.TableMetadata; import org.avniproject.etl.domain.metadata.diff.Diff; -import org.avniproject.etl.repository.rowMappers.ColumnMetadataMapper; -import org.avniproject.etl.repository.rowMappers.MediaTableMetadataBuilder; -import org.avniproject.etl.repository.rowMappers.SyncTelemetryTableMetadataBuilder; -import org.avniproject.etl.repository.rowMappers.UserTableMetadataBuilder; -import org.avniproject.etl.repository.rowMappers.TableMetadataMapper; +import org.avniproject.etl.repository.rowMappers.*; import org.avniproject.etl.repository.rowMappers.tableMappers.AddressTable; import org.avniproject.etl.repository.rowMappers.tableMappers.ChecklistTable; import org.springframework.beans.factory.annotation.Autowired; @@ -45,6 +41,7 @@ public SchemaMetadata getNewSchemaMetadata() { List tables = new ArrayList<>(getFormTables()); tables.add(getAddressTable()); tables.add(MediaTableMetadataBuilder.build()); + tables.add(MediaAnalysisTableMetadataBuilder.build()); tables.add(SyncTelemetryTableMetadataBuilder.build()); tables.add(UserTableMetadataBuilder.build()); tables.addAll(getGroupSubjectTables()); diff --git a/src/main/java/org/avniproject/etl/repository/rowMappers/MediaAnalysisTableMetadataBuilder.java b/src/main/java/org/avniproject/etl/repository/rowMappers/MediaAnalysisTableMetadataBuilder.java new file mode 100644 index 0000000..2f953fe --- /dev/null +++ b/src/main/java/org/avniproject/etl/repository/rowMappers/MediaAnalysisTableMetadataBuilder.java @@ -0,0 +1,19 @@ +package org.avniproject.etl.repository.rowMappers; + +import org.avniproject.etl.domain.metadata.ColumnMetadata; +import org.avniproject.etl.domain.metadata.TableMetadata; +import org.avniproject.etl.repository.rowMappers.tableMappers.MediaAnalysisTable; + +import java.util.stream.Collectors; + +public class MediaAnalysisTableMetadataBuilder { + public static TableMetadata build() { + TableMetadata mediaAnalysisTableMetadata = new TableMetadata(); + MediaAnalysisTable mediaAnalysisTable = new MediaAnalysisTable(); + mediaAnalysisTableMetadata.setName(mediaAnalysisTable.name(null)); + mediaAnalysisTableMetadata.setType(TableMetadata.Type.MediaAnalysis); + mediaAnalysisTableMetadata.addColumnMetadata(mediaAnalysisTable.columns().stream().map(column -> new ColumnMetadata(column, null, null, null)).collect(Collectors.toList())); + + return mediaAnalysisTableMetadata; + } +} diff --git a/src/main/java/org/avniproject/etl/repository/rowMappers/tableMappers/MediaAnalysisTable.java b/src/main/java/org/avniproject/etl/repository/rowMappers/tableMappers/MediaAnalysisTable.java new file mode 100644 index 0000000..13b6031 --- /dev/null +++ b/src/main/java/org/avniproject/etl/repository/rowMappers/tableMappers/MediaAnalysisTable.java @@ -0,0 +1,27 @@ +package org.avniproject.etl.repository.rowMappers.tableMappers; + +import org.avniproject.etl.domain.metadata.Column; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class MediaAnalysisTable extends Table { + @Override + public String name(Map tableDetails) { + return "media_analysis"; + } + + @Override + public List columns() { + return new Columns() + .withColumns(Arrays.asList( + new Column("uuid", Column.Type.text, Column.ColumnType.index), + new Column("image_url", Column.Type.text), + new Column("is_valid_url", Column.Type.bool), + new Column("is_present_in_storage", Column.Type.bool), + new Column("is_thumbnail_generated", Column.Type.bool) + )) + .build(); + } +} \ No newline at end of file diff --git a/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java b/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java index 6d9f04d..4ade50e 100644 --- a/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java +++ b/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java @@ -24,22 +24,28 @@ public MediaTableRepositoryService(AmazonClientService amazonClientService) { } public MediaDTO setMediaDto(ResultSet rs) { + return this.setMediaDto(rs, true); + } + + public MediaDTO setMediaDto(ResultSet rs, boolean generateSignedUrls) { try { String imageUrl = rs.getString("image_url"); String thumbnailUrl = Utils.getThumbnailUrl(imageUrl); URL signedImageUrl = null, signedThumbnailUrl = null; - try { - signedImageUrl = amazonClientService.generateMediaDownloadUrl(imageUrl); + if(generateSignedUrls) { try { - signedThumbnailUrl = amazonClientService.generateMediaDownloadUrl(thumbnailUrl); - } catch (S3FileDoesNotExist ignored) { + signedImageUrl = amazonClientService.generateMediaDownloadUrl(imageUrl); + try { + signedThumbnailUrl = amazonClientService.generateMediaDownloadUrl(thumbnailUrl); + } catch (S3FileDoesNotExist ignored) { + } + } catch (IllegalArgumentException illegalArgumentException) { + //Ignore and move on. Image will be null + } catch (S3FileDoesNotExist e) { + throw new RuntimeException(e); } - } catch (IllegalArgumentException illegalArgumentException) { - //Ignore and move on. Image will be null - } catch (S3FileDoesNotExist e) { - throw new RuntimeException(e); } String uuid = rs.getString("uuid"); diff --git a/src/main/java/org/avniproject/etl/repository/sql/MediaSearchQueryBuilder.java b/src/main/java/org/avniproject/etl/repository/sql/MediaSearchQueryBuilder.java index 092820d..314fe35 100644 --- a/src/main/java/org/avniproject/etl/repository/sql/MediaSearchQueryBuilder.java +++ b/src/main/java/org/avniproject/etl/repository/sql/MediaSearchQueryBuilder.java @@ -72,6 +72,14 @@ public MediaSearchQueryBuilder withPage(Page page) { return this; } + public MediaSearchQueryBuilder allWithoutAnyLimitOrOffset() { + template.add("joinTablesAndColumns", null); + template.add("request", null); + parameters.put("offset", 0); + parameters.put("limit", Long.MAX_VALUE); + return this; + } + public Query build() { String str = template.render(); logger.debug(str); diff --git a/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java b/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java new file mode 100644 index 0000000..981b081 --- /dev/null +++ b/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java @@ -0,0 +1,143 @@ +package org.avniproject.etl.repository.sync; + +import org.apache.log4j.Logger; +import org.avniproject.etl.config.AmazonClientService; +import org.avniproject.etl.domain.NullObject; +import org.avniproject.etl.domain.OrgIdentityContextHolder; +import org.avniproject.etl.domain.Organisation; +import org.avniproject.etl.domain.metadata.TableMetadata; +import org.avniproject.etl.dto.MediaAnalysisVO; +import org.avniproject.etl.dto.MediaDTO; +import org.avniproject.etl.repository.MediaTableRepository; +import org.avniproject.etl.repository.sql.SqlFile; +import org.avniproject.etl.service.MediaAnalysisService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter; +import org.springframework.stereotype.Repository; +import org.stringtemplate.v4.ST; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.avniproject.etl.repository.JdbcContextWrapper.runInOrgContext; + +@Repository +public class MediaAnalysisTableRegenerateAction { + public static final String THUMBNAILS_PATTERN = "thumbnails"; + public static final String ADHOC_MOBILE_DB_BACKUP_PATTERN = "Adhoc|MobileDbBackup"; + public static final String UUID_V4_PATTERN = "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"; + + private final AmazonClientService amazonClientService; + private final MediaTableRepository mediaTableRepository; + private final JdbcTemplate jdbcTemplate; + private static final String generateMediaAnalysisTableTemplate = SqlFile.readSqlFile("mediaAnalysis.sql.st"); + + private static final Logger log = Logger.getLogger(MediaAnalysisService.class); + + @Autowired + public MediaAnalysisTableRegenerateAction(AmazonClientService amazonClientService, MediaTableRepository mediaTableRepository, JdbcTemplate jdbcTemplate) { + this.amazonClientService = amazonClientService; + this.mediaTableRepository = mediaTableRepository; + this.jdbcTemplate = jdbcTemplate; + } + + public void process(Organisation organisation, TableMetadata tableMetadata) { + + List listOfAllMediaUrls = fetchValidMediaUrlsFromStorage(organisation); + Map> partitionResults = partitionListBasedOnThumbnailsPattern(listOfAllMediaUrls); + List listOfAllThumbnailsUrls = partitionResults.get(Boolean.TRUE); + List listOfAllMediaUrlsExcludingThumbnails = partitionResults.get(Boolean.FALSE); + log.info(String.format("listOfAllMediaUrls %d listOfAllMediaUrlsExcludingThumbnails %d listOfAllThumbnailsUrls %d", listOfAllMediaUrls.size(), listOfAllMediaUrlsExcludingThumbnails.size(), listOfAllThumbnailsUrls.size())); + + //TODO Log entries that get filtered out for dev purposes + // TODO: 17/07/24 Fetch list of MediaUrls from media table + // SELECT REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', '') as image_url_in_media_table + // FROM goonj.media + // ORDER BY REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', ''); + // TODO: 17/07/24 Invoke Analysis method to perform various metrics computations for each entry in media table of the org + //TODO Fix test issues causing build break + List listOfMediaEntities = mediaTableRepository.getAllMedia(); + String orgMediaDirectory = organisation.getOrganisationIdentity().getMediaDirectory(); + // TODO: 22/07/24 do + List mediaAnalysisVOS = listOfMediaEntities.stream().map(mediaDTO -> { + boolean isValidUrl = mediaDTO.url().contains(orgMediaDirectory); + String urlToSearch = mediaDTO.url().substring(mediaDTO.url().indexOf(orgMediaDirectory)); + boolean isPresentInStorage = listOfAllMediaUrlsExcludingThumbnails.contains(urlToSearch); + // TODO: 22/07/24 init booleans correctly + return new MediaAnalysisVO(mediaDTO.uuid(), mediaDTO.url(), isValidUrl, isPresentInStorage, false); + }).collect(Collectors.toList()); + log.info(String.format("listOfMediaEntities %d mediaAnalysisVOS %d ", listOfMediaEntities.size(), mediaAnalysisVOS.size())); + + truncateMediaAnalysisTable(tableMetadata); + generateMediaAnalysisTableEntries(tableMetadata, mediaAnalysisVOS); + } + + private void truncateMediaAnalysisTable(TableMetadata tableMetadata) { + String schema = OrgIdentityContextHolder.getDbSchema(); + String mediaAnalysisTable = tableMetadata.getName(); + String sql = new ST("delete from . where uuid is not null;") + .add("schemaName", wrapInQuotes(schema)) + .add("mediaAnalysisTable", wrapInQuotes(mediaAnalysisTable)) + .render(); + runInOrgContext(() -> { + jdbcTemplate.execute(sql); + return NullObject.instance(); + }, jdbcTemplate); + } + + private List fetchValidMediaUrlsFromStorage(Organisation organisation) { + List listOfAllMediaUrls = amazonClientService.listObjectsInBucket(getMediaDirectory(organisation)); + filterOutNonMediaUrls(listOfAllMediaUrls); + return listOfAllMediaUrls; + } + + private void filterOutNonMediaUrls(List listOfAllMediaUrls) { + Predicate fastSyncAndAdhocDumpPatternPredicate = Pattern.compile(ADHOC_MOBILE_DB_BACKUP_PATTERN, Pattern.CASE_INSENSITIVE).asPredicate(); + Predicate notUUIDPatternPredicate = Pattern.compile(UUID_V4_PATTERN).asPredicate().negate(); + listOfAllMediaUrls.removeIf(fastSyncAndAdhocDumpPatternPredicate.or(notUUIDPatternPredicate)); + } + + private Map> partitionListBasedOnThumbnailsPattern(List listOfAllMediaUrls) { + Predicate thumbnailsPatternPredicate = Pattern.compile(THUMBNAILS_PATTERN, Pattern.CASE_INSENSITIVE).asPredicate(); + Map> partitionResults= listOfAllMediaUrls.stream().collect(Collectors.partitioningBy(thumbnailsPatternPredicate)); + return partitionResults; + } + + private String getMediaDirectory(Organisation organisation) { + return organisation.getOrganisationIdentity().getMediaDirectory(); + } + + private void generateMediaAnalysisTableEntries(TableMetadata tableMetadata, List mediaAnalysisVOS) { + String schema = OrgIdentityContextHolder.getDbSchema(); + String mediaAnalysisTable = tableMetadata.getName(); + String sql = new ST(generateMediaAnalysisTableTemplate) + .add("schemaName", wrapInQuotes(schema)) + .add("mediaAnalysisTable", wrapInQuotes(mediaAnalysisTable)) + .render(); + runInOrgContext(() -> { + jdbcTemplate.batchUpdate(sql, + mediaAnalysisVOS, + 100, + (ps, mediaAnalysisVO) -> { + ps.setString(1, mediaAnalysisVO.getUuid()); + ps.setString(2, mediaAnalysisVO.getImage_url()); + ps.setBoolean(3, mediaAnalysisVO.isValidUrl()); + ps.setBoolean(4, mediaAnalysisVO.isPresentInStorage()); + ps.setBoolean(5, mediaAnalysisVO.isThumbnailGenerated()); + }); + return NullObject.instance(); + }, jdbcTemplate); + } + + private String wrapInQuotes(String parameter) { + return parameter == null ? "null" : "\"" + parameter + "\""; + } + +} diff --git a/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java b/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java index e3f2e54..a181550 100644 --- a/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java +++ b/src/main/java/org/avniproject/etl/service/MediaAnalysisService.java @@ -1,42 +1,34 @@ package org.avniproject.etl.service; import org.apache.log4j.Logger; -import org.avniproject.etl.config.AmazonClientService; import org.avniproject.etl.config.EtlServiceConfig; import org.avniproject.etl.domain.OrgIdentityContextHolder; import org.avniproject.etl.domain.Organisation; import org.avniproject.etl.domain.OrganisationIdentity; +import org.avniproject.etl.domain.metadata.TableMetadata; import org.avniproject.etl.repository.OrganisationRepository; +import org.avniproject.etl.repository.sync.MediaAnalysisTableRegenerateAction; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; -import java.util.Map; -import java.util.function.Predicate; -import java.util.regex.Pattern; -import java.util.stream.Collectors; +import java.util.Optional; @Service public class MediaAnalysisService { - public static final String THUMBNAILS_PATTERN = "thumbnails"; - public static final String ADHOC_MOBILE_DB_BACKUP_PATTERN = "Adhoc|MobileDbBackup"; - public static final String UUID_V4_PATTERN = "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"; + private final MediaAnalysisTableRegenerateAction mediaAnalysisTableRegenerateAction; private final OrganisationRepository organisationRepository; private final OrganisationFactory organisationFactory; - private final SchemaMigrationService schemaMigrationService; - private final SyncService syncService; private final EtlServiceConfig etlServiceConfig; - private final AmazonClientService amazonClientService; private static final Logger log = Logger.getLogger(MediaAnalysisService.class); @Autowired - public MediaAnalysisService(OrganisationRepository organisationRepository, OrganisationFactory organisationFactory, SchemaMigrationService schemaMigrationService, SyncService syncService, EtlServiceConfig etlServiceConfig, AmazonClientService amazonClientService) { + public MediaAnalysisService(MediaAnalysisTableRegenerateAction mediaAnalysisTableRegenerateAction, OrganisationRepository organisationRepository, OrganisationFactory organisationFactory, + EtlServiceConfig etlServiceConfig) { + this.mediaAnalysisTableRegenerateAction = mediaAnalysisTableRegenerateAction; this.organisationRepository = organisationRepository; this.organisationFactory = organisationFactory; - this.schemaMigrationService = schemaMigrationService; - this.syncService = syncService; this.etlServiceConfig = etlServiceConfig; - this.amazonClientService = amazonClientService; } public void runFor(String organisationUUID) { @@ -57,42 +49,12 @@ public void runFor(OrganisationIdentity organisationIdentity) { log.info(String.format("Running Media Analysis for %s", organisationIdentity.toString())); OrgIdentityContextHolder.setContext(organisationIdentity, etlServiceConfig); Organisation organisation = organisationFactory.create(organisationIdentity); - - List listOfAllMediaUrls = fetchValidMediaUrlsFromStorage(organisation); - Map> partitionResults = partitionListBasedOnThumbnailsPattern(listOfAllMediaUrls); - List listOfAllThumbnailsUrls = partitionResults.get(Boolean.TRUE); - List listOfAllMediaUrlsExcludingThumbnails = partitionResults.get(Boolean.FALSE); - log.info(String.format("listOfAllMediaUrls %d listOfAllMediaUrlsExcludingThumbnails %d listOfAllThumbnailsUrls %d", listOfAllMediaUrls.size(), listOfAllMediaUrlsExcludingThumbnails.size(), listOfAllThumbnailsUrls.size())); - - //TODO Log entries that get filtered out for dev purposes - // TODO: 17/07/24 Fetch list of MediaUrls from media table - // SELECT REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', '') as image_url_in_media_table - // FROM goonj.media - // ORDER BY REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', ''); - // TODO: 17/07/24 Invoke Analysis method to perform various metrics computations for each entry in media table of the org - //TODO Fix test issues causing build break + Optional mediaAnalysisTableMetadata = organisation.getSchemaMetadata().getMediaAnalysisTable(); + if(!mediaAnalysisTableMetadata.isPresent()) { + log.error(String.format("Sync job hasn't yet run for schema %s with dbUser %s and schemaUser %s", organisationIdentity.getSchemaName(), organisationIdentity.getDbUser(), organisationIdentity.getSchemaUser())); + return; + } + mediaAnalysisTableRegenerateAction.process(organisation,mediaAnalysisTableMetadata.get()); log.info(String.format("Completed Media Analysis for schema %s with dbUser %s and schemaUser %s", organisationIdentity.getSchemaName(), organisationIdentity.getDbUser(), organisationIdentity.getSchemaUser())); } - - private List fetchValidMediaUrlsFromStorage(Organisation organisation) { - List listOfAllMediaUrls = amazonClientService.listObjectsInBucket(getMediaDirectory(organisation)); - filterOutNonMediaUrls(listOfAllMediaUrls); - return listOfAllMediaUrls; - } - - private void filterOutNonMediaUrls(List listOfAllMediaUrls) { - Predicate fastSyncAndAdhocDumpPatternPredicate = Pattern.compile(ADHOC_MOBILE_DB_BACKUP_PATTERN, Pattern.CASE_INSENSITIVE).asPredicate(); - Predicate notUUIDPatternPredicate = Pattern.compile(UUID_V4_PATTERN).asPredicate().negate(); - listOfAllMediaUrls.removeIf(fastSyncAndAdhocDumpPatternPredicate.or(notUUIDPatternPredicate)); - } - - private Map> partitionListBasedOnThumbnailsPattern(List listOfAllMediaUrls) { - Predicate thumbnailsPatternPredicate = Pattern.compile(THUMBNAILS_PATTERN, Pattern.CASE_INSENSITIVE).asPredicate(); - Map> partitionResults= listOfAllMediaUrls.stream().collect(Collectors.partitioningBy(thumbnailsPatternPredicate)); - return partitionResults; - } - - private String getMediaDirectory(Organisation organisation) { - return organisation.getOrganisationIdentity().getMediaDirectory(); - } -} +} \ No newline at end of file diff --git a/src/main/java/org/avniproject/etl/service/SyncService.java b/src/main/java/org/avniproject/etl/service/SyncService.java index 4dcb1b5..5353721 100644 --- a/src/main/java/org/avniproject/etl/service/SyncService.java +++ b/src/main/java/org/avniproject/etl/service/SyncService.java @@ -36,7 +36,8 @@ public SyncService(EntityRepository entityRepository, EntitySyncStatusRepository @Transactional(propagation = Propagation.REQUIRES_NEW) public void sync(Organisation organisation) { SchemaMetadata currentSchemaMetadata = organisation.getSchemaMetadata(); - currentSchemaMetadata.getOrderedTableMetadata().forEach(tableMetadata -> migrateTable(tableMetadata, organisation.getSyncStatus(), currentSchemaMetadata)); + currentSchemaMetadata.getOrderedTableMetadata().stream().filter(TableMetadata::isPartOfRegularSync) + .forEach(tableMetadata -> migrateTable(tableMetadata, organisation.getSyncStatus(), currentSchemaMetadata)); } @Transactional diff --git a/src/main/resources/sql/api/searchMedia.sql.st b/src/main/resources/sql/api/searchMedia.sql.st index 99c56e4..0af9af9 100644 --- a/src/main/resources/sql/api/searchMedia.sql.st +++ b/src/main/resources/sql/api/searchMedia.sql.st @@ -41,6 +41,7 @@ FROM .media media where media.image_url is not null and media.is_voided is false + and media.created_date_time >= :fromDate and media.created_date_time \<= :toDate and ( and ( address." id" in (:addressLevelIds_)1 = 2}; separator="\n OR "> ) + ORDER BY media.created_date_time desc LIMIT :limit OFFSET :offset; diff --git a/src/main/resources/sql/etl/mediaAnalysis.sql.st b/src/main/resources/sql/etl/mediaAnalysis.sql.st new file mode 100644 index 0000000..40193cb --- /dev/null +++ b/src/main/resources/sql/etl/mediaAnalysis.sql.st @@ -0,0 +1,3 @@ +insert into . (uuid, image_url, + is_valid_url, is_present_in_storage, is_thumbnail_generated) +VALUES (?, ?, ?, ?, ?); \ No newline at end of file diff --git a/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java b/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java index d62dc3c..ef55f16 100644 --- a/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java +++ b/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java @@ -79,6 +79,17 @@ public void shouldGetMediaTable() { assertThat(mediaTable.isPresent(), is(true)); } + @Test + @Sql({"/test-data-teardown.sql", "/test-data.sql"}) + @Sql(scripts = {"/test-data-teardown.sql"}, executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) + public void shouldGetMediaAnalysisTable() { + SchemaMetadata schemaMetadata = schemaMetadataRepository.getExistingSchemaMetadata(); + Optional mediaAnalysis = schemaMetadata.getTableMetadata().stream().filter(tableMetadata1 -> tableMetadata1.getName().equals("media_analysis")).findFirst(); + + assertThat(mediaAnalysis.isPresent(), is(true)); + } + + @Test @Sql({"/test-data-teardown.sql", "/test-data.sql"}) @Sql(scripts = {"/test-data-teardown.sql"}, executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) From 7c6cf5e85251b66efd599706a86ad5a8ac3a4d3f Mon Sep 17 00:00:00 2001 From: himeshr Date: Wed, 24 Jul 2024 12:43:38 +0530 Subject: [PATCH 08/14] #102 | Implement logic to perform analysis of media entities --- .../avniproject/etl/dto/MediaAnalysisVO.java | 12 +++- .../tableMappers/MediaAnalysisTable.java | 3 +- .../MediaAnalysisTableRegenerateAction.java | 56 +++++++++++-------- .../resources/sql/etl/mediaAnalysis.sql.st | 4 +- 4 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/avniproject/etl/dto/MediaAnalysisVO.java b/src/main/java/org/avniproject/etl/dto/MediaAnalysisVO.java index 7732432..5193ba3 100644 --- a/src/main/java/org/avniproject/etl/dto/MediaAnalysisVO.java +++ b/src/main/java/org/avniproject/etl/dto/MediaAnalysisVO.java @@ -9,13 +9,15 @@ public class MediaAnalysisVO { boolean isValidUrl; boolean isPresentInStorage; boolean isThumbnailGenerated; + boolean isHavingDuplicates; - public MediaAnalysisVO(String uuid, String image_url, boolean isValidUrl, boolean isPresentInStorage, boolean isThumbnailGenerated) { + public MediaAnalysisVO(String uuid, String image_url, boolean isValidUrl, boolean isPresentInStorage, boolean isThumbnailGenerated, boolean isHavingDuplicates) { this.uuid = uuid; this.image_url = image_url; this.isValidUrl = isValidUrl; this.isPresentInStorage = isPresentInStorage; this.isThumbnailGenerated = isThumbnailGenerated; + this.isHavingDuplicates = isHavingDuplicates; } public String getUuid() { @@ -38,16 +40,20 @@ public boolean isThumbnailGenerated() { return isThumbnailGenerated; } + public boolean isHavingDuplicates() { + return isHavingDuplicates; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof MediaAnalysisVO)) return false; MediaAnalysisVO that = (MediaAnalysisVO) o; - return uuid.equals(that.uuid); + return getUuid().equals(that.getUuid()) && getImage_url().equals(that.getImage_url()); } @Override public int hashCode() { - return Objects.hash(uuid); + return Objects.hash(getUuid(), getImage_url()); } } diff --git a/src/main/java/org/avniproject/etl/repository/rowMappers/tableMappers/MediaAnalysisTable.java b/src/main/java/org/avniproject/etl/repository/rowMappers/tableMappers/MediaAnalysisTable.java index 13b6031..12f8327 100644 --- a/src/main/java/org/avniproject/etl/repository/rowMappers/tableMappers/MediaAnalysisTable.java +++ b/src/main/java/org/avniproject/etl/repository/rowMappers/tableMappers/MediaAnalysisTable.java @@ -20,7 +20,8 @@ public List columns() { new Column("image_url", Column.Type.text), new Column("is_valid_url", Column.Type.bool), new Column("is_present_in_storage", Column.Type.bool), - new Column("is_thumbnail_generated", Column.Type.bool) + new Column("is_thumbnail_generated", Column.Type.bool), + new Column("is_having_duplicates", Column.Type.bool) )) .build(); } diff --git a/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java b/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java index 981b081..60abd1f 100644 --- a/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java +++ b/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java @@ -12,16 +12,13 @@ import org.avniproject.etl.repository.sql.SqlFile; import org.avniproject.etl.service.MediaAnalysisService; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter; import org.springframework.stereotype.Repository; import org.stringtemplate.v4.ST; -import java.sql.PreparedStatement; -import java.sql.SQLException; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -33,6 +30,13 @@ public class MediaAnalysisTableRegenerateAction { public static final String THUMBNAILS_PATTERN = "thumbnails"; public static final String ADHOC_MOBILE_DB_BACKUP_PATTERN = "Adhoc|MobileDbBackup"; public static final String UUID_V4_PATTERN = "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"; + public static final String STRING_CONST_SEPARATOR = "/"; + public static final String TRUNCATE_MEDIA_ANALYSIS_TABLE_SQL = "delete from . where uuid is not null;"; + public static final String SCHEMA_NAME = "schemaName"; + public static final String MEDIA_ANALYSIS_TABLE = "mediaAnalysisTable"; + public static final int INT_CONSTANT_ZERO = 0; + public static final String COMPOSITE_UUID_SEPARATOR = "#"; + public static final int INT_CONSTANT_ONE = 1; private final AmazonClientService amazonClientService; private final MediaTableRepository mediaTableRepository; @@ -56,24 +60,27 @@ public void process(Organisation organisation, TableMetadata tableMetadata) { List listOfAllMediaUrlsExcludingThumbnails = partitionResults.get(Boolean.FALSE); log.info(String.format("listOfAllMediaUrls %d listOfAllMediaUrlsExcludingThumbnails %d listOfAllThumbnailsUrls %d", listOfAllMediaUrls.size(), listOfAllMediaUrlsExcludingThumbnails.size(), listOfAllThumbnailsUrls.size())); - //TODO Log entries that get filtered out for dev purposes - // TODO: 17/07/24 Fetch list of MediaUrls from media table - // SELECT REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', '') as image_url_in_media_table - // FROM goonj.media - // ORDER BY REPLACE(image_url, 'https://s3.ap-south-1.amazonaws.com/prod-user-media/goonj/', ''); - // TODO: 17/07/24 Invoke Analysis method to perform various metrics computations for each entry in media table of the org - //TODO Fix test issues causing build break - List listOfMediaEntities = mediaTableRepository.getAllMedia(); String orgMediaDirectory = organisation.getOrganisationIdentity().getMediaDirectory(); - // TODO: 22/07/24 do - List mediaAnalysisVOS = listOfMediaEntities.stream().map(mediaDTO -> { + List listOfMediaDTOEntities = mediaTableRepository.getAllMedia(); + Map mediaUrlsMap = listOfAllMediaUrlsExcludingThumbnails.stream().collect(Collectors.toMap(mediaUrl -> mediaUrl.substring(mediaUrl.lastIndexOf(STRING_CONST_SEPARATOR)), Function.identity())); + Map thumbnailUrlsMap = listOfAllThumbnailsUrls.stream().collect(Collectors.toMap(thumbnailUrl -> thumbnailUrl.substring(thumbnailUrl.lastIndexOf(STRING_CONST_SEPARATOR)), Function.identity())); + + Map> groupedMediaEntityMap = listOfMediaDTOEntities.stream() + .collect(Collectors.groupingBy(mediaDTO -> mediaDTO.uuid())); //mediaDTO.uuid() returns a composite uuid of entity.uuid#media.uuid + List mediaAnalysisVOS = groupedMediaEntityMap.entrySet().stream().map(groupedMediaEntityMapEntry -> { + MediaDTO mediaDTO = groupedMediaEntityMapEntry.getValue().get(INT_CONSTANT_ZERO); + boolean isPresentInStorage = false, isThumbnailGenerated = false; boolean isValidUrl = mediaDTO.url().contains(orgMediaDirectory); - String urlToSearch = mediaDTO.url().substring(mediaDTO.url().indexOf(orgMediaDirectory)); - boolean isPresentInStorage = listOfAllMediaUrlsExcludingThumbnails.contains(urlToSearch); - // TODO: 22/07/24 init booleans correctly - return new MediaAnalysisVO(mediaDTO.uuid(), mediaDTO.url(), isValidUrl, isPresentInStorage, false); + if (isValidUrl) { + String urlToSearch = mediaDTO.url().substring(mediaDTO.url().lastIndexOf(STRING_CONST_SEPARATOR)); + isPresentInStorage = mediaUrlsMap.containsKey(urlToSearch); + isThumbnailGenerated = thumbnailUrlsMap.containsKey(urlToSearch); + } + return new MediaAnalysisVO(mediaDTO.uuid().substring(INT_CONSTANT_ZERO,mediaDTO.uuid().indexOf(COMPOSITE_UUID_SEPARATOR)), + mediaDTO.url(), isValidUrl, isPresentInStorage, isThumbnailGenerated, + groupedMediaEntityMapEntry.getValue().size() > INT_CONSTANT_ONE); }).collect(Collectors.toList()); - log.info(String.format("listOfMediaEntities %d mediaAnalysisVOS %d ", listOfMediaEntities.size(), mediaAnalysisVOS.size())); + log.info(String.format("listOfMediaDTOEntities %d mediaAnalysisVOS %d duplicates %d", listOfMediaDTOEntities.size(), mediaAnalysisVOS.size(), listOfMediaDTOEntities.size() - mediaAnalysisVOS.size())); truncateMediaAnalysisTable(tableMetadata); generateMediaAnalysisTableEntries(tableMetadata, mediaAnalysisVOS); @@ -82,9 +89,9 @@ public void process(Organisation organisation, TableMetadata tableMetadata) { private void truncateMediaAnalysisTable(TableMetadata tableMetadata) { String schema = OrgIdentityContextHolder.getDbSchema(); String mediaAnalysisTable = tableMetadata.getName(); - String sql = new ST("delete from . where uuid is not null;") - .add("schemaName", wrapInQuotes(schema)) - .add("mediaAnalysisTable", wrapInQuotes(mediaAnalysisTable)) + String sql = new ST(TRUNCATE_MEDIA_ANALYSIS_TABLE_SQL) + .add(SCHEMA_NAME, wrapInQuotes(schema)) + .add(MEDIA_ANALYSIS_TABLE, wrapInQuotes(mediaAnalysisTable)) .render(); runInOrgContext(() -> { jdbcTemplate.execute(sql); @@ -118,8 +125,8 @@ private void generateMediaAnalysisTableEntries(TableMetadata tableMetadata, List String schema = OrgIdentityContextHolder.getDbSchema(); String mediaAnalysisTable = tableMetadata.getName(); String sql = new ST(generateMediaAnalysisTableTemplate) - .add("schemaName", wrapInQuotes(schema)) - .add("mediaAnalysisTable", wrapInQuotes(mediaAnalysisTable)) + .add(SCHEMA_NAME, wrapInQuotes(schema)) + .add(MEDIA_ANALYSIS_TABLE, wrapInQuotes(mediaAnalysisTable)) .render(); runInOrgContext(() -> { jdbcTemplate.batchUpdate(sql, @@ -131,6 +138,7 @@ private void generateMediaAnalysisTableEntries(TableMetadata tableMetadata, List ps.setBoolean(3, mediaAnalysisVO.isValidUrl()); ps.setBoolean(4, mediaAnalysisVO.isPresentInStorage()); ps.setBoolean(5, mediaAnalysisVO.isThumbnailGenerated()); + ps.setBoolean(6, mediaAnalysisVO.isHavingDuplicates()); }); return NullObject.instance(); }, jdbcTemplate); diff --git a/src/main/resources/sql/etl/mediaAnalysis.sql.st b/src/main/resources/sql/etl/mediaAnalysis.sql.st index 40193cb..5e913a5 100644 --- a/src/main/resources/sql/etl/mediaAnalysis.sql.st +++ b/src/main/resources/sql/etl/mediaAnalysis.sql.st @@ -1,3 +1,3 @@ insert into . (uuid, image_url, - is_valid_url, is_present_in_storage, is_thumbnail_generated) -VALUES (?, ?, ?, ?, ?); \ No newline at end of file + is_valid_url, is_present_in_storage, is_thumbnail_generated, is_having_duplicates) +VALUES (?, ?, ?, ?, ?, ?); \ No newline at end of file From 603c56cca5f68ab50095f4eb2414cae1b2e15bd0 Mon Sep 17 00:00:00 2001 From: himeshr Date: Wed, 24 Jul 2024 13:07:09 +0530 Subject: [PATCH 09/14] #102 | Fix build breaks due to test failures --- .../etl/repository/SchemaMetadataRepositoryTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java b/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java index ef55f16..96a4c42 100644 --- a/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java +++ b/src/test/java/org/avniproject/etl/repository/SchemaMetadataRepositoryTest.java @@ -33,7 +33,7 @@ public void before() { @Sql(scripts = {"/test-data-teardown.sql"}, executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) public void shouldGetAllTablesForAnOrganisation() { SchemaMetadata schemaMetadata = schemaMetadataRepository.getNewSchemaMetadata(); - assertThat(schemaMetadata.getTableMetadata().size(), is(15)); + assertThat(schemaMetadata.getTableMetadata().size(), is(16)); } @Test @@ -83,7 +83,7 @@ public void shouldGetMediaTable() { @Sql({"/test-data-teardown.sql", "/test-data.sql"}) @Sql(scripts = {"/test-data-teardown.sql"}, executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) public void shouldGetMediaAnalysisTable() { - SchemaMetadata schemaMetadata = schemaMetadataRepository.getExistingSchemaMetadata(); + SchemaMetadata schemaMetadata = schemaMetadataRepository.getNewSchemaMetadata(); Optional mediaAnalysis = schemaMetadata.getTableMetadata().stream().filter(tableMetadata1 -> tableMetadata1.getName().equals("media_analysis")).findFirst(); assertThat(mediaAnalysis.isPresent(), is(true)); From 6f00eb5cf6c1d7a85b60323aa4b85ec5edf1ae1b Mon Sep 17 00:00:00 2001 From: himeshr Date: Wed, 24 Jul 2024 18:54:44 +0530 Subject: [PATCH 10/14] #102 | Add reference etl internal-api postman collection --- .../api/AVNI ETL.postman_collection.json | 422 ++++++++++++++++++ 1 file changed, 422 insertions(+) create mode 100644 src/main/resources/api/AVNI ETL.postman_collection.json diff --git a/src/main/resources/api/AVNI ETL.postman_collection.json b/src/main/resources/api/AVNI ETL.postman_collection.json new file mode 100644 index 0000000..573a56e --- /dev/null +++ b/src/main/resources/api/AVNI ETL.postman_collection.json @@ -0,0 +1,422 @@ +{ + "info": { + "_postman_id": "7cd37fa2-d77d-4a7b-816a-4b26ed1d5d5a", + "name": "AVNI ETL", + "description": "Avni ETL Internal APIs Documentation", + "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json", + "_exporter_id": "20957992" + }, + "item": [ + { + "name": "Scheduler Status", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{url}}/scheduler", + "host": [ + "{{url}}" + ], + "path": [ + "scheduler" + ] + } + }, + "response": [] + }, + { + "name": "Ping", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{url}}/ping", + "host": [ + "{{url}}" + ], + "path": [ + "ping" + ] + } + }, + "response": [] + }, + { + "name": "Fetch MediaAnalysis Job statuses", + "request": { + "method": "POST", + "header": [], + "body": { + "mode": "raw", + "raw": "[\n\"{{organisation_uuid}}\"\n]", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job/status?jobGroup=MediaAnalysis", + "host": [ + "{{url}}" + ], + "path": [ + "job", + "status" + ], + "query": [ + { + "key": "jobGroup", + "value": "MediaAnalysis" + } + ] + } + }, + "response": [] + }, + { + "name": "Fetch Sync Job statuses", + "request": { + "method": "POST", + "header": [], + "body": { + "mode": "raw", + "raw": "[\n\"{{organisation_uuid}}\"\n]", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job/status?jobGroup=Sync", + "host": [ + "{{url}}" + ], + "path": [ + "job", + "status" + ], + "query": [ + { + "key": "jobGroup", + "value": "Sync" + } + ] + } + }, + "response": [] + }, + { + "name": "Fetch MediaAnalysis Job History", + "protocolProfileBehavior": { + "disableBodyPruning": true + }, + "request": { + "method": "GET", + "header": [], + "body": { + "mode": "raw", + "raw": "", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job/history/{{organisation_uuid}}?jobGroup=MediaAnalysis", + "host": [ + "{{url}}" + ], + "path": [ + "job", + "history", + "{{organisation_uuid}}" + ], + "query": [ + { + "key": "jobGroup", + "value": "MediaAnalysis" + } + ] + } + }, + "response": [] + }, + { + "name": "Fetch Sync Job History", + "protocolProfileBehavior": { + "disableBodyPruning": true + }, + "request": { + "method": "GET", + "header": [], + "body": { + "mode": "raw", + "raw": "", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job/history/{{organisation_uuid}}?jobGroup=Sync", + "host": [ + "{{url}}" + ], + "path": [ + "job", + "history", + "{{organisation_uuid}}" + ], + "query": [ + { + "key": "jobGroup", + "value": "Sync" + } + ] + } + }, + "response": [] + }, + { + "name": "Fetch MediaAnalysis Job status for org", + "protocolProfileBehavior": { + "disableBodyPruning": true + }, + "request": { + "method": "GET", + "header": [], + "body": { + "mode": "raw", + "raw": "", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job/{{organisation_uuid}}?jobGroup=MediaAnalysis", + "host": [ + "{{url}}" + ], + "path": [ + "job", + "{{organisation_uuid}}" + ], + "query": [ + { + "key": "jobGroup", + "value": "MediaAnalysis" + } + ] + } + }, + "response": [] + }, + { + "name": "Fetch Sync Job status for org", + "protocolProfileBehavior": { + "disableBodyPruning": true + }, + "request": { + "method": "GET", + "header": [], + "body": { + "mode": "raw", + "raw": "", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job/{{organisation_uuid}}?jobGroup=Sync", + "host": [ + "{{url}}" + ], + "path": [ + "job", + "{{organisation_uuid}}" + ], + "query": [ + { + "key": "jobGroup", + "value": "Sync" + } + ] + } + }, + "response": [] + }, + { + "name": "Create Job", + "request": { + "method": "POST", + "header": [], + "body": { + "mode": "raw", + "raw": "{\n \"entityUUID\": \"{{organisation_uuid}}\",\n \"jobEntityType\": \"Organisation\"\n}", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job", + "host": [ + "{{url}}" + ], + "path": [ + "job" + ] + } + }, + "response": [] + }, + { + "name": "Create MediaAnalysis Job", + "request": { + "method": "POST", + "header": [], + "body": { + "mode": "raw", + "raw": "{\n \"entityUUID\": \"{{organisation_uuid}}\",\n \"jobEntityType\": \"Organisation\",\n \"jobGroup\": \"MediaAnalysis\"\n}", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job", + "host": [ + "{{url}}" + ], + "path": [ + "job" + ] + } + }, + "response": [] + }, + { + "name": "Delete a Job", + "request": { + "method": "DELETE", + "header": [], + "body": { + "mode": "raw", + "raw": "", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job/{{organisation_uuid}}?jobGroup=Sync", + "host": [ + "{{url}}" + ], + "path": [ + "job", + "{{organisation_uuid}}" + ], + "query": [ + { + "key": "jobGroup", + "value": "Sync" + } + ] + } + }, + "response": [] + }, + { + "name": "Delete a MediaAnalysis Job", + "request": { + "method": "DELETE", + "header": [], + "body": { + "mode": "raw", + "raw": "", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{url}}/job/{{organisation_uuid}}?jobGroup=MediaAnalysis", + "host": [ + "{{url}}" + ], + "path": [ + "job", + "{{organisation_uuid}}" + ], + "query": [ + { + "key": "jobGroup", + "value": "MediaAnalysis" + } + ] + } + }, + "response": [] + } + ], + "auth": { + "type": "apikey", + "apikey": [ + { + "key": "value", + "value": "{{token}}", + "type": "string" + }, + { + "key": "key", + "value": "AUTH-TOKEN", + "type": "string" + } + ] + }, + "event": [ + { + "listen": "prerequest", + "script": { + "type": "text/javascript", + "exec": [ + "" + ] + } + }, + { + "listen": "test", + "script": { + "type": "text/javascript", + "exec": [ + "" + ] + } + } + ], + "variable": [ + { + "key": "url", + "value": "http://localhost:8022", + "type": "string" + }, + { + "key": "organisation_uuid", + "value": "7e8f3e7f-1c9e-4a35-b6f4-fdd4eb30bfb3", + "type": "string" + } + ] +} \ No newline at end of file From e07cbf43478d60cb4a1a995d9eab7610d5317431 Mon Sep 17 00:00:00 2001 From: himeshr Date: Wed, 24 Jul 2024 19:31:19 +0530 Subject: [PATCH 11/14] #102 | Absorb code review comments, set trigger to once a day for MediaAnalysis job --- .../avniproject/etl/dto/MediaCompactDTO.java | 4 ++ .../etl/repository/MediaTableRepository.java | 4 +- .../service/MediaTableRepositoryService.java | 38 ++++++++++++------- .../MediaAnalysisTableRegenerateAction.java | 27 ++++++------- .../resources/main-application.properties | 2 +- 5 files changed, 43 insertions(+), 32 deletions(-) create mode 100644 src/main/java/org/avniproject/etl/dto/MediaCompactDTO.java diff --git a/src/main/java/org/avniproject/etl/dto/MediaCompactDTO.java b/src/main/java/org/avniproject/etl/dto/MediaCompactDTO.java new file mode 100644 index 0000000..6862649 --- /dev/null +++ b/src/main/java/org/avniproject/etl/dto/MediaCompactDTO.java @@ -0,0 +1,4 @@ +package org.avniproject.etl.dto; + +public record MediaCompactDTO (String compositeUUID, String entityUUID, String url){ +} diff --git a/src/main/java/org/avniproject/etl/repository/MediaTableRepository.java b/src/main/java/org/avniproject/etl/repository/MediaTableRepository.java index 77b6274..becfad7 100644 --- a/src/main/java/org/avniproject/etl/repository/MediaTableRepository.java +++ b/src/main/java/org/avniproject/etl/repository/MediaTableRepository.java @@ -88,9 +88,9 @@ public List getImageData(MediaSearchRequest mediaSearchRequest, Page return searchInternal(mediaSearchRequest, page, (rs, rowNum) -> mediaTableRepositoryService.setImageData(rs)); } - public List getAllMedia() { + public List getAllMedia() { Query query = new MediaSearchQueryBuilder().allWithoutAnyLimitOrOffset().build(); return runInSchemaUserContext(() -> new NamedParameterJdbcTemplate(jdbcTemplate) - .query(query.sql(), query.parameters(), (rs, rowNum) -> mediaTableRepositoryService.setMediaDto(rs, false)), jdbcTemplate); + .query(query.sql(), query.parameters(), (rs, rowNum) -> mediaTableRepositoryService.setMediaCompactDTO(rs)), jdbcTemplate); } } diff --git a/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java b/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java index 4ade50e..c0f0d03 100644 --- a/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java +++ b/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java @@ -1,9 +1,9 @@ package org.avniproject.etl.repository.service; -import org.apache.log4j.Logger; import org.avniproject.etl.config.AmazonClientService; import org.avniproject.etl.config.S3FileDoesNotExist; import org.avniproject.etl.dto.ImageData; +import org.avniproject.etl.dto.MediaCompactDTO; import org.avniproject.etl.dto.MediaDTO; import org.avniproject.etl.util.Utils; import org.springframework.stereotype.Service; @@ -23,29 +23,39 @@ public MediaTableRepositoryService(AmazonClientService amazonClientService) { this.amazonClientService = amazonClientService; } - public MediaDTO setMediaDto(ResultSet rs) { - return this.setMediaDto(rs, true); + public MediaCompactDTO setMediaCompactDTO(ResultSet rs) { + try { + String imageUrl = rs.getString("image_url"); + String uuid = rs.getString("uuid"); + String imageUUID = getImageUUID(imageUrl); + String compositeUUID = uuid + "#" + imageUUID; + return new MediaCompactDTO( + compositeUUID, + uuid, + imageUrl + ); + } catch (SQLException e) { + throw new RuntimeException(e); + } } - public MediaDTO setMediaDto(ResultSet rs, boolean generateSignedUrls) { + public MediaDTO setMediaDto(ResultSet rs) { try { String imageUrl = rs.getString("image_url"); String thumbnailUrl = Utils.getThumbnailUrl(imageUrl); URL signedImageUrl = null, signedThumbnailUrl = null; - if(generateSignedUrls) { + try { + signedImageUrl = amazonClientService.generateMediaDownloadUrl(imageUrl); try { - signedImageUrl = amazonClientService.generateMediaDownloadUrl(imageUrl); - try { - signedThumbnailUrl = amazonClientService.generateMediaDownloadUrl(thumbnailUrl); - } catch (S3FileDoesNotExist ignored) { - } - } catch (IllegalArgumentException illegalArgumentException) { - //Ignore and move on. Image will be null - } catch (S3FileDoesNotExist e) { - throw new RuntimeException(e); + signedThumbnailUrl = amazonClientService.generateMediaDownloadUrl(thumbnailUrl); + } catch (S3FileDoesNotExist ignored) { } + } catch (IllegalArgumentException illegalArgumentException) { + //Ignore and move on. Image will be null + } catch (S3FileDoesNotExist e) { + throw new RuntimeException(e); } String uuid = rs.getString("uuid"); diff --git a/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java b/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java index 60abd1f..bada19e 100644 --- a/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java +++ b/src/main/java/org/avniproject/etl/repository/sync/MediaAnalysisTableRegenerateAction.java @@ -7,7 +7,7 @@ import org.avniproject.etl.domain.Organisation; import org.avniproject.etl.domain.metadata.TableMetadata; import org.avniproject.etl.dto.MediaAnalysisVO; -import org.avniproject.etl.dto.MediaDTO; +import org.avniproject.etl.dto.MediaCompactDTO; import org.avniproject.etl.repository.MediaTableRepository; import org.avniproject.etl.repository.sql.SqlFile; import org.avniproject.etl.service.MediaAnalysisService; @@ -35,7 +35,6 @@ public class MediaAnalysisTableRegenerateAction { public static final String SCHEMA_NAME = "schemaName"; public static final String MEDIA_ANALYSIS_TABLE = "mediaAnalysisTable"; public static final int INT_CONSTANT_ZERO = 0; - public static final String COMPOSITE_UUID_SEPARATOR = "#"; public static final int INT_CONSTANT_ONE = 1; private final AmazonClientService amazonClientService; @@ -55,20 +54,17 @@ public MediaAnalysisTableRegenerateAction(AmazonClientService amazonClientServic public void process(Organisation organisation, TableMetadata tableMetadata) { List listOfAllMediaUrls = fetchValidMediaUrlsFromStorage(organisation); - Map> partitionResults = partitionListBasedOnThumbnailsPattern(listOfAllMediaUrls); - List listOfAllThumbnailsUrls = partitionResults.get(Boolean.TRUE); - List listOfAllMediaUrlsExcludingThumbnails = partitionResults.get(Boolean.FALSE); - log.info(String.format("listOfAllMediaUrls %d listOfAllMediaUrlsExcludingThumbnails %d listOfAllThumbnailsUrls %d", listOfAllMediaUrls.size(), listOfAllMediaUrlsExcludingThumbnails.size(), listOfAllThumbnailsUrls.size())); + Map> partitionResults = partitionListBasedOnThumbnailsPattern(listOfAllMediaUrls); + Map thumbnailUrlsMap = partitionResults.get(Boolean.TRUE); + Map mediaUrlsMap = partitionResults.get(Boolean.FALSE); String orgMediaDirectory = organisation.getOrganisationIdentity().getMediaDirectory(); - List listOfMediaDTOEntities = mediaTableRepository.getAllMedia(); - Map mediaUrlsMap = listOfAllMediaUrlsExcludingThumbnails.stream().collect(Collectors.toMap(mediaUrl -> mediaUrl.substring(mediaUrl.lastIndexOf(STRING_CONST_SEPARATOR)), Function.identity())); - Map thumbnailUrlsMap = listOfAllThumbnailsUrls.stream().collect(Collectors.toMap(thumbnailUrl -> thumbnailUrl.substring(thumbnailUrl.lastIndexOf(STRING_CONST_SEPARATOR)), Function.identity())); + List listOfMediaDTOEntities = mediaTableRepository.getAllMedia(); - Map> groupedMediaEntityMap = listOfMediaDTOEntities.stream() - .collect(Collectors.groupingBy(mediaDTO -> mediaDTO.uuid())); //mediaDTO.uuid() returns a composite uuid of entity.uuid#media.uuid + Map> groupedMediaEntityMap = listOfMediaDTOEntities.stream() + .collect(Collectors.groupingBy(mediaDTO -> mediaDTO.compositeUUID())); List mediaAnalysisVOS = groupedMediaEntityMap.entrySet().stream().map(groupedMediaEntityMapEntry -> { - MediaDTO mediaDTO = groupedMediaEntityMapEntry.getValue().get(INT_CONSTANT_ZERO); + MediaCompactDTO mediaDTO = groupedMediaEntityMapEntry.getValue().get(INT_CONSTANT_ZERO); boolean isPresentInStorage = false, isThumbnailGenerated = false; boolean isValidUrl = mediaDTO.url().contains(orgMediaDirectory); if (isValidUrl) { @@ -76,7 +72,7 @@ public void process(Organisation organisation, TableMetadata tableMetadata) { isPresentInStorage = mediaUrlsMap.containsKey(urlToSearch); isThumbnailGenerated = thumbnailUrlsMap.containsKey(urlToSearch); } - return new MediaAnalysisVO(mediaDTO.uuid().substring(INT_CONSTANT_ZERO,mediaDTO.uuid().indexOf(COMPOSITE_UUID_SEPARATOR)), + return new MediaAnalysisVO(mediaDTO.entityUUID(), mediaDTO.url(), isValidUrl, isPresentInStorage, isThumbnailGenerated, groupedMediaEntityMapEntry.getValue().size() > INT_CONSTANT_ONE); }).collect(Collectors.toList()); @@ -111,9 +107,10 @@ private void filterOutNonMediaUrls(List listOfAllMediaUrls) { listOfAllMediaUrls.removeIf(fastSyncAndAdhocDumpPatternPredicate.or(notUUIDPatternPredicate)); } - private Map> partitionListBasedOnThumbnailsPattern(List listOfAllMediaUrls) { + private Map> partitionListBasedOnThumbnailsPattern(List listOfAllMediaUrls) { Predicate thumbnailsPatternPredicate = Pattern.compile(THUMBNAILS_PATTERN, Pattern.CASE_INSENSITIVE).asPredicate(); - Map> partitionResults= listOfAllMediaUrls.stream().collect(Collectors.partitioningBy(thumbnailsPatternPredicate)); + Map> partitionResults= listOfAllMediaUrls.stream().collect(Collectors.partitioningBy(thumbnailsPatternPredicate, + Collectors.toMap(url -> url.substring(url.lastIndexOf(STRING_CONST_SEPARATOR)), Function.identity()))); return partitionResults; } diff --git a/src/main/resources/main-application.properties b/src/main/resources/main-application.properties index f8d726c..aade73e 100644 --- a/src/main/resources/main-application.properties +++ b/src/main/resources/main-application.properties @@ -17,7 +17,7 @@ spring.quartz.properties.org.quartz.jobStore.misfireThreshold = ${AVNI_SCHEDULED # Internal Scheduler config avni.scheduledJob.sync.repeatIntervalInMinutes=${AVNI_SCHEDULED_JOB_REPEAT_INTERVAL_IN_MINUTES:90} -avni.scheduledJob.mediaAnalysis.repeatIntervalInMinutes=${AVNI_MEDIA_ANALYSIS_JOB_REPEAT_INTERVAL_IN_MINUTES:2} +avni.scheduledJob.mediaAnalysis.repeatIntervalInMinutes=${AVNI_MEDIA_ANALYSIS_JOB_REPEAT_INTERVAL_IN_MINUTES:1440} #S3 Parameters avni.bucket.name=${OPENCHS_BUCKET_NAME:dummy} From a43d38e935f2abbad3bbb3b5204caaeb7444fdcc Mon Sep 17 00:00:00 2001 From: himeshr Date: Thu, 25 Jul 2024 18:27:29 +0530 Subject: [PATCH 12/14] avniproject/avni-media#177 | Avoid throwing exception if an image is missing, otherwise it'll break the media-viewer app --- .../repository/service/MediaTableRepositoryService.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java b/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java index c0f0d03..863e8d1 100644 --- a/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java +++ b/src/main/java/org/avniproject/etl/repository/service/MediaTableRepositoryService.java @@ -50,12 +50,11 @@ public MediaDTO setMediaDto(ResultSet rs) { signedImageUrl = amazonClientService.generateMediaDownloadUrl(imageUrl); try { signedThumbnailUrl = amazonClientService.generateMediaDownloadUrl(thumbnailUrl); - } catch (S3FileDoesNotExist ignored) { + } catch (IllegalArgumentException | S3FileDoesNotExist exception) { + //Ignore and move on. Thumbnail will be broken } - } catch (IllegalArgumentException illegalArgumentException) { + } catch (IllegalArgumentException | S3FileDoesNotExist exception) { //Ignore and move on. Image will be null - } catch (S3FileDoesNotExist e) { - throw new RuntimeException(e); } String uuid = rs.getString("uuid"); From e87ec9df7b78918af3f7c3481084f3bcbdcc2611 Mon Sep 17 00:00:00 2001 From: himeshr Date: Fri, 26 Jul 2024 12:09:30 +0530 Subject: [PATCH 13/14] avniproject/avni-media#177 | Enable running ETL using staging idp and s3 config --- Makefile | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/Makefile b/Makefile index ce88d14..462f786 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,20 @@ debug_server: build_server debug_server_with_dump_data_org: build_server OPENCHS_DATABASE_NAME=avni_org OPENCHS_CLIENT_ID=dummy OPENCHS_KEYCLOAK_CLIENT_SECRET=dummy AVNI_IDP_TYPE=none java -Xmx2048m -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005 -jar ./build/libs/etl-1.0.0-SNAPSHOT.jar + +start_server_staging: build_server + -mkdir -p /tmp/avni-etl-service && sudo ln -s /tmp/avni-etl-service /var/log/avni-etl-service + + AVNI_IDP_TYPE=cognito \ + OPENCHS_CLIENT_ID=$(OPENCHS_STAGING_APP_CLIENT_ID) \ + OPENCHS_USER_POOL=$(OPENCHS_STAGING_USER_POOL_ID) \ + OPENCHS_IAM_USER=$(OPENCHS_STAGING_IAM_USER) \ + OPENCHS_IAM_USER_ACCESS_KEY=$(OPENCHS_STAGING_IAM_USER_ACCESS_KEY) \ + OPENCHS_IAM_USER_SECRET_ACCESS_KEY=$(OPENCHS_STAGING_IAM_USER_SECRET_ACCESS_KEY) \ + OPENCHS_BUCKET_NAME=staging-user-media \ + OPENCHS_DATABASE_URL=jdbc:postgresql://localhost:6015/openchs \ + java -jar ./build/libs/etl-1.0.0-SNAPSHOT.jar + boot_run: ./gradlew bootRun From 4a08b57620c4a0104fdcdfa890f69569a28d8dc0 Mon Sep 17 00:00:00 2001 From: himeshr Date: Thu, 8 Aug 2024 11:03:42 +0530 Subject: [PATCH 14/14] avniproject/avni-webapp#1305 | Include OrgUUIDs which do not have ETL running in getStatus response --- .../etl/service/backgroundJob/ScheduledJobService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/avniproject/etl/service/backgroundJob/ScheduledJobService.java b/src/main/java/org/avniproject/etl/service/backgroundJob/ScheduledJobService.java index 4a8314a..9c4af98 100644 --- a/src/main/java/org/avniproject/etl/service/backgroundJob/ScheduledJobService.java +++ b/src/main/java/org/avniproject/etl/service/backgroundJob/ScheduledJobService.java @@ -26,7 +26,7 @@ public class ScheduledJobService { private static final String HISTORY_QUERY = "select sjr.started_at, sjr.ended_at, sjr.error_message, sjr.success from qrtz_job_details qjd\n" + " left outer join scheduled_job_run sjr on sjr.job_name = qjd.job_name\n" + " where qjd.job_name = ? and qjd.job_group = ?" + "order by 1 desc\n"; - private static final String JOB_LIST_QUERY = "select organisationUUID, job_name from (SELECT unnest(string_to_array(?, ',')) as organisationUUID) foo\n" + " left outer join qrtz_job_details qjd on organisationUUID = qjd.job_name where qjd.job_group = ?"; + private static final String JOB_LIST_QUERY = "select organisationUUID, job_name from (SELECT unnest(string_to_array(?, ',')) as organisationUUID) foo\n" + " left outer join qrtz_job_details qjd on organisationUUID = qjd.job_name where qjd.job_group = ? or qjd is null"; @Autowired public ScheduledJobService(JdbcTemplate jdbcTemplate, Scheduler scheduler, ScheduledJobConfig scheduledJobConfig) {