diff --git a/pom.xml b/pom.xml index 249887c..72099a0 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ bio.overture song-search - 2.8.0 + 2.9.0 song-search GQL microservice for searching maestro generated song indexes @@ -73,6 +73,13 @@ test + + + org.springframework.kafka + spring-kafka + 2.8.4 + + com.graphql-java diff --git a/src/main/java/bio/overture/songsearch/config/constants/SearchFields.java b/src/main/java/bio/overture/songsearch/config/constants/SearchFields.java index c3cb09e..73119e2 100644 --- a/src/main/java/bio/overture/songsearch/config/constants/SearchFields.java +++ b/src/main/java/bio/overture/songsearch/config/constants/SearchFields.java @@ -26,6 +26,8 @@ public class SearchFields { public static final String ANALYSIS_ID = "analysisId"; public static final String ANALYSIS_TYPE = "analysisType"; + + public static final String REPOSITORY_CODE = "code"; public static final String ANALYSIS_VERSION = "analysisVersion"; public static final String ANALYSIS_STATE = "analysisState"; public static final String FILE_OBJECT_ID = "objectId"; diff --git a/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java b/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java new file mode 100644 index 0000000..a64d9f1 --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/AnalysisMessage.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2018. Ontario Institute for Cancer Research + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package bio.overture.songsearch.config.kafka; + +import static lombok.AccessLevel.PRIVATE; + +import bio.overture.songsearch.model.enums.AnalysisState; +import lombok.*; + +@Value +// Note: although the AllArgs and NoArgs combination below seems odd, +// it allows Jackson to deserialize to an immutable object without using any additional annotations. +@AllArgsConstructor +@NoArgsConstructor(force = true, access = PRIVATE) +public class AnalysisMessage { + + @NonNull private final String analysisId; + @NonNull private final String studyId; + @NonNull private final String state; + @NonNull private final String songServerId; + @NonNull private final Analysis analysis; + + public static AnalysisMessage createAnalysisMessage( + bio.overture.songsearch.model.Analysis analysis, String songServerId) { + + return new AnalysisMessage( + analysis.getAnalysisId(), + analysis.getStudyId(), + analysis.getAnalysisState().toString(), + analysis.getRepositories().get(0).getCode(), + new Analysis( + analysis.getAnalysisId(), + new AnalysisType(analysis.getAnalysisType()), + analysis.getAnalysisState(), + analysis.getStudyId())); + } + + @Value + @AllArgsConstructor + private static class Analysis { + String analysisId; + AnalysisType analysisType; + AnalysisState analysisState; + String studyId; + } + + @Data + @AllArgsConstructor + public static class AnalysisType { + String name; + } +} diff --git a/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java b/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java new file mode 100644 index 0000000..8e055b9 --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/DefaultSender.java @@ -0,0 +1,13 @@ +package bio.overture.songsearch.config.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class DefaultSender implements Sender { + + public void send(String payload, String key) { + log.debug("key: " + key); + } +} diff --git a/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java b/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java new file mode 100644 index 0000000..ba25dee --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/KafkaConfig.java @@ -0,0 +1,52 @@ +package bio.overture.songsearch.config.kafka; + +import java.util.HashMap; +import java.util.Map; +import lombok.val; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +@Configuration +@Profile("kafka") +public class KafkaConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.template.automation-trigger}") + private String automation_trigger; + + @Bean + public Map producerConfigs() { + val props = new HashMap(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return props; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + val template = new KafkaTemplate<>(producerFactory()); + template.setDefaultTopic(automation_trigger); + return template; + } + + @Bean + public KafkaSender sender() { + return new KafkaSender(); + } +} diff --git a/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java b/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java new file mode 100644 index 0000000..b5ebbeb --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/KafkaSender.java @@ -0,0 +1,17 @@ +package bio.overture.songsearch.config.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; + +@Slf4j +@Profile("kafka") +public class KafkaSender implements Sender { + @Autowired private KafkaTemplate kafkaTemplate; + + public void send(String payload, String key) { + log.debug("sending payload='{}' to topic='{}'", payload, kafkaTemplate.getDefaultTopic()); + kafkaTemplate.send(kafkaTemplate.getDefaultTopic(), key, payload); + } +} diff --git a/src/main/java/bio/overture/songsearch/config/kafka/Sender.java b/src/main/java/bio/overture/songsearch/config/kafka/Sender.java new file mode 100644 index 0000000..c77afb5 --- /dev/null +++ b/src/main/java/bio/overture/songsearch/config/kafka/Sender.java @@ -0,0 +1,5 @@ +package bio.overture.songsearch.config.kafka; + +public interface Sender { + void send(String payload, String key); +} diff --git a/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java b/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java index ad4130c..8f1a312 100644 --- a/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java +++ b/src/main/java/bio/overture/songsearch/graphql/GraphQLProvider.java @@ -52,6 +52,8 @@ public class GraphQLProvider { private final AnalysisDataFetcher analysisDataFetcher; private final FileDataFetcher fileDataFetcher; private final EntityDataFetcher entityDataFetcher; + + private final StartAutomationMutation startAutomationMutation; private final AuthProperties authProperties; private GraphQL graphQL; private GraphQLSchema graphQLSchema; @@ -61,10 +63,12 @@ public GraphQLProvider( AnalysisDataFetcher analysisDataFetcher, FileDataFetcher fileDataFetcher, EntityDataFetcher entityDataFetcher, + StartAutomationMutation startAutomationMutation, AuthProperties authProperties) { this.analysisDataFetcher = analysisDataFetcher; this.fileDataFetcher = fileDataFetcher; this.entityDataFetcher = entityDataFetcher; + this.startAutomationMutation = startAutomationMutation; this.authProperties = authProperties; } @@ -147,6 +151,9 @@ private RuntimeWiring buildWiring() { .dataFetcher( "sampleMatchedAnalysesForDonor", analysisDataFetcher.getSampleMatchedAnalysesForDonorFetcher())) + .type( + newTypeWiring("Mutation") + .dataFetcher("startAutomation", startAutomationMutation.startAutomationResolver())) .build(); } diff --git a/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java b/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java new file mode 100644 index 0000000..c5604d6 --- /dev/null +++ b/src/main/java/bio/overture/songsearch/graphql/StartAutomationMutation.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2021 The Ontario Institute for Cancer Research. All rights reserved + * + * This program and the accompanying materials are made available under the terms of the GNU Affero General Public License v3.0. + * You should have received a copy of the GNU Affero General Public License along with + * this program. If not, see . + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER + * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package bio.overture.songsearch.graphql; + +import static bio.overture.songsearch.config.kafka.AnalysisMessage.createAnalysisMessage; +import static bio.overture.songsearch.utils.JacksonUtils.convertValue; +import static java.util.stream.Collectors.toUnmodifiableList; + +import bio.overture.songsearch.config.kafka.Sender; +import bio.overture.songsearch.model.Analysis; +import bio.overture.songsearch.model.AutomationMutationResponse; + +import bio.overture.songsearch.model.Sort; +import bio.overture.songsearch.service.AnalysisService; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import graphql.schema.DataFetcher; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import lombok.NonNull; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class StartAutomationMutation { + + private final AnalysisService analysisService; + + @Value("${songServerId") + private String songServerId; + + private final Sender sender; + + @Autowired + public StartAutomationMutation(AnalysisService analysisService, @NonNull Sender sender) { + this.analysisService = analysisService; + this.sender = sender; + } + + public DataFetcher startAutomationResolver() { + return env -> { + val args = env.getArguments(); + + val filter = ImmutableMap.builder(); + val page = ImmutableMap.builder(); + val sorts = ImmutableList.builder(); + + if (args != null) { + if (args.get("filter") != null) filter.putAll((Map) args.get("filter")); + if (args.get("page") != null) page.putAll((Map) args.get("page")); + if (args.get("sorts") != null) { + val rawSorts = (List) args.get("sorts"); + sorts.addAll( + rawSorts.stream() + .map(sort -> convertValue(sort, Sort.class)) + .collect(toUnmodifiableList())); + } + } + + Analysis analysis = + analysisService.getAnalysisById(env.getArguments().get("analysisId").toString()); + log.debug("Analysis fetched: " + analysis); + + if (Objects.isNull(analysis)) { + log.debug("Analysis not found."); + AutomationMutationResponse response = + new AutomationMutationResponse(analysis, "Analysis not found"); + return response; + } + AutomationMutationResponse response = + new AutomationMutationResponse(analysis, "payload sent"); + sendAnalysisMessage(analysis); + log.debug("Message sent to kafka queue"); + return response; + }; + } + + @SneakyThrows + public void sendAnalysisMessage(Analysis analysis) { + val message = createAnalysisMessage(analysis, songServerId); + log.debug("Message payload:: " + new ObjectMapper().writeValueAsString(message)); + sender.send(new ObjectMapper().writeValueAsString(message), message.getAnalysisId()); + } +} diff --git a/src/main/java/bio/overture/songsearch/model/AutomationMutationResponse.java b/src/main/java/bio/overture/songsearch/model/AutomationMutationResponse.java new file mode 100644 index 0000000..0a5fdcd --- /dev/null +++ b/src/main/java/bio/overture/songsearch/model/AutomationMutationResponse.java @@ -0,0 +1,10 @@ +package bio.overture.songsearch.model; + +import lombok.Value; + +@Value +public class AutomationMutationResponse { + + private Analysis analysis; + private String message; +} diff --git a/src/main/java/bio/overture/songsearch/repository/AnalysisRepository.java b/src/main/java/bio/overture/songsearch/repository/AnalysisRepository.java index d0309a2..7584f07 100644 --- a/src/main/java/bio/overture/songsearch/repository/AnalysisRepository.java +++ b/src/main/java/bio/overture/songsearch/repository/AnalysisRepository.java @@ -80,6 +80,13 @@ private static Map>> argumentPa .put(ANALYSIS_STATE, value -> new TermQueryBuilder("analysis_state", value)) .put(STUDY_ID, value -> new TermQueryBuilder("study_id", value)) .put(RUN_ID, value -> new TermQueryBuilder("workflow.run_id", value)) + .put( + REPOSITORY_CODE, + value -> + new NestedQueryBuilder( + "repositories", + new TermQueryBuilder("repositories.code", value), + ScoreMode.None)) .put( DONOR_ID, value -> diff --git a/src/main/java/bio/overture/songsearch/service/AnalysisService.java b/src/main/java/bio/overture/songsearch/service/AnalysisService.java index 14b687f..b1501d0 100644 --- a/src/main/java/bio/overture/songsearch/service/AnalysisService.java +++ b/src/main/java/bio/overture/songsearch/service/AnalysisService.java @@ -35,6 +35,7 @@ import lombok.NonNull; import lombok.SneakyThrows; import lombok.Value; +import lombok.extern.slf4j.Slf4j; import lombok.val; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.search.SearchHit; @@ -42,12 +43,14 @@ import org.springframework.stereotype.Service; @Service +@Slf4j public class AnalysisService { private final AnalysisRepository analysisRepository; @Autowired public AnalysisService(AnalysisRepository analysisRepository) { + this.analysisRepository = analysisRepository; } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 732b52e..693c5ea 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -8,15 +8,26 @@ server: port: 8080 elastic: + host: workflow-es.rdpc-qa.cancercollaboratory.org #https://workflow-es.rdpc-qa.cancercollaboratory.org:443 #workflow-es.rdpc.cancercollaboratory.org #workflow-es.rdpc-qa.cancercollaboratory.org #workflow-es.rdpc-dev.cancercollaboratory.org #localhost + port: 443 #9200 + useHttps: true #false + useAuthentication: true #false + username: elastic + password: h5bdgjfh4gdqhkdcjdj5hj59 #vt92j5r5mcwgxwfvjjrc9k5d #h5bdgjfh4gdqhkdcjdj5hj59 #T8e5X9Jn5pF6rX8tCLNCEzJZ #testing + analysisCentricIndex: analysis_centric + fileCentricIndex: file_centric + +elasticX: host: localhost port: 9200 useHttps: false useAuthentication: false username: elastic - password: testing + password: analysisCentricIndex: analysis_centric fileCentricIndex: file_centric + song-search: workflowRunParameterKeys: analysisId: @@ -27,10 +38,28 @@ song-search: --- spring.config.activate.on-profile: secure auth: - jwtPublicKeyUrl: "http://localhost:8081/oauth/token/public_key" + jwtPublicKeyUrl: "https://ego.dev.argo.cancercollaboratory.org/api/oauth/token/public_key" #"http://localhost:8081/oauth/token/public_key" jwtPublicKeyStr: "-----BEGIN PUBLIC KEY-----\nSET ME IF YOU DONT HAVE A URL, BUT URL TAKES PRIORITY\n-----END PUBLIC KEY-----" graphqlScopes: - queryOnly: - - RDPC-DEV.READ - queryAndMutation: - - RDPC-DEV.WRITE + queryOnly: + - RDPC-DEV.READ + - RDPC-CA.READ + queryAndMutation: + - RDPC-DEV.WRITE + - RDPC-CA.WRITE +--- + +spring: + config: + activate: + on-profile: kafka + kafka: + bootstrap-servers: localhost:9092 + template: + automation-trigger: song-analysis + +songServerId: submission-song.collab +--- +#spring: +# profiles: +# active: secure \ No newline at end of file diff --git a/src/main/resources/schema.graphql b/src/main/resources/schema.graphql index f808e4a..f3066ef 100644 --- a/src/main/resources/schema.graphql +++ b/src/main/resources/schema.graphql @@ -126,6 +126,7 @@ input AnalysisFilter { sampleId: String sampleType: String runId: String + code: String } input AnalysisFileFilter { @@ -250,3 +251,12 @@ extend type Query { """ sampleMatchedAnalysesForDonor(req: SampleMatchedAnalysesForDonorReq!): [SampleMatchedAnalysisPair] } + +type Mutation { + startAutomation(analysisId: String!): MutationResponse +} + +type MutationResponse { + analysis: Analysis + message: String +} \ No newline at end of file