Skip to content

Commit

Permalink
Check for deposit status processor for status update
Browse files Browse the repository at this point in the history
  • Loading branch information
rpoet-jh committed Feb 20, 2024
1 parent c66f33f commit 210aedb
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void accept(Deposit deposit) {
// if result is still intermediate, add Deposit to queue for processing?

// Determine the logical success or failure of the Deposit, and persist the Deposit and RepositoryCopy
depositHelper.processDepositStatus(deposit.getId());
depositHelper.processDepositStatus(deposit);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import static java.lang.System.identityHashCode;
import static org.eclipse.deposit.util.loggers.Loggers.WORKERS_LOGGER;

import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Predicate;

import org.apache.commons.lang3.StringUtils;
import org.eclipse.pass.deposit.DepositServiceErrorHandler;
import org.eclipse.pass.deposit.DepositServiceRuntimeException;
import org.eclipse.pass.deposit.RemedialDepositException;
Expand Down Expand Up @@ -69,9 +71,6 @@ public class DepositTaskHelper {
"Missing Packager for Repository named '{}', marking Deposit as " +
"FAILED.";
private static final String PRECONDITION_FAILED = "Refusing to update {}, the following pre-condition failed: ";
private static final String ERR_RESOLVE_REPOSITORY = "Unable to resolve Repository Configuration for Repository " +
"%s (%s). Verify the Deposit Services runtime configuration" +
" location and " + "content.";
private static final String ERR_PARSING_STATUS_DOC = "Failed to update deposit status for [%s], parsing the " +
"status document referenced by %s failed: %s";
private static final String ERR_MAPPING_STATUS = "Failed to update deposit status for [%s], mapping the status " +
Expand Down Expand Up @@ -146,20 +145,69 @@ public void submitDeposit(Submission submission, DepositSubmission depositSubmis
}
}

void processDepositStatus(String depositId) {
void processDepositStatus(Deposit deposit) {
try {
Repository repository = passClient.getObject(deposit.getRepository());
RepositoryConfig repositoryConfig = repositories.getConfig(repository.getRepositoryKey());
getDepositStatusProcessor(repositoryConfig)
.ifPresentOrElse(
depositStatusProcessor ->
updateDepositStatusIfNeeded(deposit, repositoryConfig, depositStatusProcessor),
() -> LOG.info("No deposit status processor found for Deposit {}, status not updated",
deposit.getId())
);
} catch (IOException e) {
LOG.error("Failed to update Deposit Status {}", deposit.getId(), e);
}
}

void updateDepositRepositoryCopyStatus(Deposit deposit) {
try {
RepositoryCopy repoCopy = passClient.getObject(deposit.getRepositoryCopy());
switch (deposit.getDepositStatus()) {
case ACCEPTED -> {
LOG.debug("Deposit {} was accepted.", deposit.getId());
repoCopy.setCopyStatus(CopyStatus.COMPLETE);
passClient.updateObject(repoCopy);
}
case REJECTED -> {
LOG.debug("Deposit {} was rejected.", deposit.getId());
repoCopy.setCopyStatus(CopyStatus.REJECTED);
passClient.updateObject(repoCopy);
}
default -> {
}
}
} catch (Exception e) {
String msg = String.format(ERR_UPDATE_REPOCOPY, deposit.getRepositoryCopy(), deposit.getId());
throw new DepositServiceRuntimeException(msg, e, deposit);
}
}

private Optional<DepositStatusProcessor> getDepositStatusProcessor(RepositoryConfig repositoryConfig) {
if (Objects.isNull(repositoryConfig)
|| Objects.isNull(repositoryConfig.getRepositoryDepositConfig())
|| Objects.isNull(repositoryConfig.getRepositoryDepositConfig().getDepositProcessing())
|| Objects.isNull(repositoryConfig.getRepositoryDepositConfig().getDepositProcessing().getProcessor())) {
return Optional.empty();
}
return Optional.of(repositoryConfig.getRepositoryDepositConfig().getDepositProcessing().getProcessor());
}

private void updateDepositStatusIfNeeded(Deposit deposit, RepositoryConfig repositoryConfig,
DepositStatusProcessor depositStatusProcessor) {
CriticalResult<Deposit, Deposit> cr = cri.performCritical(
depositId, Deposit.class,
deposit.getId(), Deposit.class,
DepositStatusCriFunc.precondition(),
DepositStatusCriFunc.postcondition(),
DepositStatusCriFunc.critical(repositories, passClient)
DepositStatusCriFunc.critical(repositoryConfig, depositStatusProcessor)
);

if (!cr.success()) {
if (cr.throwable().isPresent()) {
Throwable t = cr.throwable().get();
if (t instanceof RemedialDepositException) {
LOG.error(format("Failed to update Deposit %s", depositId), t);
LOG.error(format("Failed to update Deposit %s", deposit.getId()), t);
return;
}

Expand All @@ -169,49 +217,18 @@ void processDepositStatus(String depositId) {

if (cr.resource().isPresent()) {
throw new DepositServiceRuntimeException(
format("Failed to update Deposit %s: %s", depositId, t.getMessage()),
format("Failed to update Deposit %s: %s", deposit.getId(), t.getMessage()),
t, cr.resource().get());
}
}

LOG.debug(format("Failed to update Deposit %s: no cause was present, probably a pre- or post-condition " +
"was not satisfied.", depositId));
"was not satisfied.", deposit.getId()));
return;
}

cr.result().ifPresent(this::updateDepositRepositoryCopyStatus);
LOG.info("Successfully processed Deposit {}", depositId);
}

RepositoryCopy updateDepositRepositoryCopyStatus(Deposit deposit) {
try {
RepositoryCopy repoCopy = passClient.getObject(deposit.getRepositoryCopy());
switch (deposit.getDepositStatus()) {
case ACCEPTED -> {
LOG.debug("Deposit {} was accepted.", deposit.getId());
repoCopy.setCopyStatus(CopyStatus.COMPLETE);
passClient.updateObject(repoCopy);
}
case REJECTED -> {
LOG.debug("Deposit {} was rejected.", deposit.getId());
repoCopy.setCopyStatus(CopyStatus.REJECTED);
passClient.updateObject(repoCopy);
}
default -> {
}
}
return repoCopy;
} catch (Exception e) {
String msg = String.format(ERR_UPDATE_REPOCOPY, deposit.getRepositoryCopy(), deposit.getId());
throw new DepositServiceRuntimeException(msg, e, deposit);
}
}

static Optional<RepositoryConfig> lookupConfig(Repository passRepository, Repositories repositories) {
if (passRepository.getRepositoryKey() != null) {
return Optional.of(repositories.getConfig(passRepository.getRepositoryKey()));
}
return Optional.empty();
LOG.info("Successfully processed Deposit {}", deposit.getId());
}

static class DepositStatusCriFunc {
Expand All @@ -233,7 +250,7 @@ static Predicate<Deposit> precondition() {
return false;
}

if (deposit.getDepositStatusRef() == null || deposit.getDepositStatusRef().trim().length() == 0) {
if (deposit.getDepositStatusRef() == null || StringUtils.isBlank(deposit.getDepositStatusRef())) {
LOG.debug(PRECONDITION_FAILED + " missing Deposit status reference.", deposit.getId());
return false;
}
Expand All @@ -258,23 +275,12 @@ static BiPredicate<Deposit, Deposit> postcondition() {
return (deposit, updatedDeposit) -> Objects.nonNull(updatedDeposit.getRepositoryCopy());
}

static Function<Deposit, Deposit> critical(Repositories repositories, PassClient passClient) {
static Function<Deposit, Deposit> critical(RepositoryConfig repositoryConfig,
DepositStatusProcessor depositStatusProcessor) {
return (deposit) -> {
AtomicReference<DepositStatus> status = new AtomicReference<>();
try {

Repository repo = passClient.getObject(deposit.getRepository());

RepositoryConfig repoConfig = lookupConfig(repo, repositories)
.orElseThrow(() ->
new RemedialDepositException(
format(ERR_RESOLVE_REPOSITORY, repo.getName(), repo.getId()), repo));
DepositStatusProcessor statusProcessor = repoConfig.getRepositoryDepositConfig()
.getDepositProcessing()
.getProcessor();
status.set(statusProcessor.process(deposit, repoConfig));
} catch (RemedialDepositException e) {
throw e;
status.set(depositStatusProcessor.process(deposit, repositoryConfig));
} catch (Exception e) {
String msg = format(ERR_PARSING_STATUS_DOC,
deposit.getId(), deposit.getDepositStatusRef(), e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private void updateSubmittedDeposits(ZonedDateTime submissionFromDate) throws IO
submittedDeposits.forEach(deposit -> {
try {
LOG.info("Updating Deposit.depositStatus for {}", deposit.getId());
depositHelper.processDepositStatus(deposit.getId());
depositHelper.processDepositStatus(deposit);
} catch (Exception e) {
LOG.warn("Failed to update Deposit {}: {}", deposit.getId(), e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2024 Johns Hopkins University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.pass.deposit.service;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.time.ZonedDateTime;

import org.eclipse.pass.deposit.util.ResourceTestUtil;
import org.eclipse.pass.support.client.model.CopyStatus;
import org.eclipse.pass.support.client.model.Deposit;
import org.eclipse.pass.support.client.model.DepositStatus;
import org.eclipse.pass.support.client.model.Repository;
import org.eclipse.pass.support.client.model.RepositoryCopy;
import org.eclipse.pass.support.client.model.Submission;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.TestPropertySource;

/**
* @author Russ Poetker ([email protected])
*/
@TestPropertySource(properties = {
"pass.deposit.repository.configuration=classpath:/full-test-repositories.json"
})
public class DepositProcessorIT extends AbstractDepositIT {

@Autowired private DepositProcessor depositProcessor;

@Test
public void testDepositProcessor_WithDepositProcessor() throws Exception {
// GIVEN
Deposit j10pDeposit = initJScholarshipDeposit();
mockSword();

// WHEN
depositProcessor.accept(j10pDeposit);

// THEN
Deposit actualDeposit = passClient.getObject(j10pDeposit);
assertEquals(DepositStatus.ACCEPTED, actualDeposit.getDepositStatus());
assertEquals("test-j10p-ref", actualDeposit.getDepositStatusRef());
verify(passClient).updateObject(eq(actualDeposit));
}

@Test
public void testDepositProcessor_NoUpdateOnDepositNoDepositProcessor() throws Exception {
// GIVEN
Deposit pmcDeposit = initPmcDeposit();

// WHEN
depositProcessor.accept(pmcDeposit);

// THEN
Deposit actualDeposit = passClient.getObject(pmcDeposit);
assertEquals(DepositStatus.SUBMITTED, actualDeposit.getDepositStatus());
assertEquals("test-pmc-package", actualDeposit.getDepositStatusRef());
verify(passClient, times(0)).updateObject(eq(actualDeposit));
}

private Deposit initPmcDeposit() throws Exception {
Submission submission = findSubmission(createSubmission(
ResourceTestUtil.readSubmissionJson("sample1")));
submission.setSubmittedDate(ZonedDateTime.now());
passClient.updateObject(submission);
Repository pmcRepo = submission.getRepositories().stream()
.filter(repository -> repository.getRepositoryKey().equals("PubMed Central"))
.findFirst().get();
Deposit pmcDeposit = new Deposit();
pmcDeposit.setSubmission(submission);
pmcDeposit.setRepository(pmcRepo);
pmcDeposit.setDepositStatus(DepositStatus.SUBMITTED);
pmcDeposit.setDepositStatusRef("test-pmc-package");
passClient.createObject(pmcDeposit);
return pmcDeposit;
}

private Deposit initJScholarshipDeposit() throws Exception {
Submission submission = findSubmission(createSubmission(
ResourceTestUtil.readSubmissionJson("sample1")));
submission.setSubmittedDate(ZonedDateTime.now());
passClient.updateObject(submission);
Repository j10pRepo = submission.getRepositories().stream()
.filter(repository -> repository.getRepositoryKey().equals("JScholarship"))
.findFirst().get();
RepositoryCopy repositoryCopy = new RepositoryCopy();
repositoryCopy.setRepository(j10pRepo);
repositoryCopy.setCopyStatus(CopyStatus.IN_PROGRESS);
passClient.createObject(repositoryCopy);
Deposit j10pDeposit = new Deposit();
j10pDeposit.setSubmission(submission);
j10pDeposit.setRepository(j10pRepo);
j10pDeposit.setRepositoryCopy(repositoryCopy);
j10pDeposit.setDepositStatus(DepositStatus.SUBMITTED);
j10pDeposit.setDepositStatusRef("test-j10p-ref");
passClient.createObject(j10pDeposit);
return j10pDeposit;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void acceptDepositWithIntermediateStatus() {

verify(intermediateDeposit).getDepositStatus();
verifyNoInteractions(cri);
verify(depositTaskHelper).processDepositStatus(depositId);
verify(depositTaskHelper).processDepositStatus(intermediateDeposit);
}

private void prepareCriFuncCriticalSuccess(DepositStatus depositStatus) throws IOException {
Expand Down
Loading

0 comments on commit 210aedb

Please sign in to comment.