From 12075a0410171ce2c7c05782e73eac68ed4e9f62 Mon Sep 17 00:00:00 2001 From: Mark Patton Date: Mon, 16 Dec 2024 09:28:17 -0500 Subject: [PATCH] Make DSpace deposit able to recover from errors. Add ability for deposit sessions to access the Deposit object --- .../pass/deposit/service/DepositTask.java | 2 +- .../support/dspace/DspaceDepositService.java | 11 ++++ .../deposit/transport/TransportSession.java | 5 +- .../transport/devnull/DevNullTransport.java | 3 +- .../transport/dspace/DSpaceSession.java | 57 ++++++++++++++----- .../transport/fs/FilesystemTransport.java | 3 +- .../transport/ftp/FtpTransportSession.java | 4 +- .../inveniordm/InvenioRdmSession.java | 4 +- .../transport/sftp/SftpTransportSession.java | 4 +- .../sword2/Sword2TransportSession.java | 4 +- .../service/SubmissionProcessorIT.java | 2 +- 11 files changed, 68 insertions(+), 31 deletions(-) diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/service/DepositTask.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/service/DepositTask.java index 79aee2eb..1bbbe476 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/service/DepositTask.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/service/DepositTask.java @@ -437,7 +437,7 @@ static Function performDeposit(DepositWorkerContext Transport transport = getTransport(packager, dc); try (TransportSession transportSession = transport.open(packagerConfig)) { - TransportResponse tr = transportSession.send(packageStream, packagerConfig, dc); + TransportResponse tr = transportSession.send(packageStream, packagerConfig, deposit); deposit.setDepositStatus(DepositStatus.SUBMITTED); deposit.setDepositStatusRef(packageStream.metadata().packageDepositStatusRef()); return tr; diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/support/dspace/DspaceDepositService.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/support/dspace/DspaceDepositService.java index f6fc833f..a94a1fe1 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/support/dspace/DspaceDepositService.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/support/dspace/DspaceDepositService.java @@ -306,4 +306,15 @@ public void createWorkflowItem(int workspaceItemId, AuthContext authContext) { .body(workspaceItemUrl) .retrieve().toBodilessEntity(); } + + public String getWorkspaceItem(int workspaceItemId, AuthContext authContext) { + String json = restClient.get() + .uri("/submission/workspaceitems/{workspaceItemId}", workspaceItemId) + .header("Authorization", authContext.authToken()) + .header("X-XSRF-TOKEN", authContext.xsrfToken()) + .header("Cookie", "DSPACE-XSRF-COOKIE=" + authContext.xsrfToken()) + .retrieve().body(String.class); + + return json; + } } diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/TransportSession.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/TransportSession.java index d980ff3d..d875c527 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/TransportSession.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/TransportSession.java @@ -21,6 +21,7 @@ import org.eclipse.pass.deposit.assembler.PackageStream; import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext; +import org.eclipse.pass.support.client.model.Deposit; /** * Represents an open connection, or the promise of a successful connection, with a service or system that will accept @@ -45,10 +46,10 @@ public interface TransportSession extends AutoCloseable { * * @param packageStream the package and package metadata * @param metadata transport-related metadata, or any "extra" package metadata - * @param dc information about PASS objects + * @param deposit deposit representation * @return a response indicating success or failure of the transfer */ - TransportResponse send(PackageStream packageStream, Map metadata, DepositWorkerContext dc); + TransportResponse send(PackageStream packageStream, Map metadata, Deposit deposit); boolean closed(); diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/devnull/DevNullTransport.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/devnull/DevNullTransport.java index 76b71875..38028c6f 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/devnull/DevNullTransport.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/devnull/DevNullTransport.java @@ -23,7 +23,6 @@ import org.eclipse.pass.deposit.cri.CriticalRepositoryInteraction; import org.eclipse.pass.deposit.cri.CriticalRepositoryInteraction.CriticalResult; import org.eclipse.pass.deposit.service.DepositUtil; -import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext; import org.eclipse.pass.deposit.transport.Transport; import org.eclipse.pass.deposit.transport.TransportResponse; import org.eclipse.pass.deposit.transport.TransportSession; @@ -68,7 +67,7 @@ class DevNullTransportSession implements TransportSession { @Override public TransportResponse send(PackageStream packageStream, Map metadata, - DepositWorkerContext dc) { + Deposit deposit) { // no-op, just return successful response return new TransportResponse() { @Override diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/dspace/DSpaceSession.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/dspace/DSpaceSession.java index 71dee4bd..f4aa18e3 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/dspace/DSpaceSession.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/dspace/DSpaceSession.java @@ -17,16 +17,17 @@ import java.util.Map; +import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; import org.eclipse.pass.deposit.assembler.PackageStream; import org.eclipse.pass.deposit.model.DepositSubmission; import org.eclipse.pass.deposit.provider.dspace.DSpaceMetadataMapper; -import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext; import org.eclipse.pass.deposit.support.dspace.DspaceDepositService; import org.eclipse.pass.deposit.support.dspace.DspaceDepositService.AuthContext; import org.eclipse.pass.deposit.transport.TransportResponse; import org.eclipse.pass.deposit.transport.TransportSession; import org.eclipse.pass.support.client.PassClient; +import org.eclipse.pass.support.client.model.Deposit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,38 +51,64 @@ public DSpaceSession(DspaceDepositService dspaceDepositService, DSpaceMetadataMa } @Override - public TransportResponse send(PackageStream packageStream, Map metadata, DepositWorkerContext dc) { + public TransportResponse send(PackageStream packageStream, Map metadata, Deposit deposit) { try { DepositSubmission depositSubmission = packageStream.getDepositSubmission(); - LOG.warn("Processing Dspace Deposit for Submission: {}", depositSubmission.getId()); + LOG.info("Processing Dspace Deposit for Submission: {}", depositSubmission.getId()); AuthContext authContext = dspaceDepositService.authenticate(); - String patchJson = dspaceMetadataMapper.patchWorkspaceItem(depositSubmission); - String workspaceItemJson = dspaceDepositService.createWorkspaceItem( - packageStream.getCustodialContent(), authContext); + // Create WorkspaceItem if it does not already exist - LOG.debug("Created WorkspaceItem: {}", workspaceItemJson); + int workspaceItemId = -1; + DocumentContext workspaceItemContext = null; - // TODO Set workspace item id on Deposit.depositStatusRef so can check if it already exists. - // TODO Then check metadata to see if it needs to be patched. + try { + workspaceItemId = Integer.parseInt(deposit.getDepositStatusRef()); + } catch (NumberFormatException e) {} - int workspaceItemId = JsonPath.parse(workspaceItemJson).read("$._embedded.workspaceitems[0].id"); - String itemUuid = JsonPath.parse(workspaceItemJson).read( - "$._embedded.workspaceitems[0]._embedded.item.uuid"); + if (workspaceItemId == -1) { + workspaceItemContext = JsonPath.parse(dspaceDepositService.createWorkspaceItem( + packageStream.getCustodialContent(), authContext)); + workspaceItemId = workspaceItemContext.read("$._embedded.workspaceitems[0].id"); - LOG.debug("Patching WorkspaceItem to add metadata {}", patchJson); - dspaceDepositService.patchWorkspaceItem(workspaceItemId, patchJson, authContext); + // Use the deposit status ref to mark that the workspace item was created + deposit.setDepositStatusRef(String.valueOf(workspaceItemId)); + + // TODO what about packageStream.metadata() package ref, it will overwrite? + + passClient.updateObject(deposit); + + LOG.debug("Created WorkspaceItem: {}", workspaceItemId); + } else { + LOG.info("DSpace WorkspaceItem already exists for Submission: {}", depositSubmission.getId()); + + workspaceItemContext = JsonPath.parse(dspaceDepositService.getWorkspaceItem(workspaceItemId, authContext)); + } + + // Check metadata to see if it needs to be patched. + + Map itemMetadata = workspaceItemContext.read("$._embedded.workspaceitems[0]._embedded.item.metadata"); + + if (itemMetadata.size() == 0) { + String patchJson = dspaceMetadataMapper.patchWorkspaceItem(depositSubmission); + + LOG.debug("Patching WorkspaceItem to add metadata {}", patchJson); + dspaceDepositService.patchWorkspaceItem(workspaceItemId, patchJson, authContext); + } LOG.debug("Creating WorkflowItem for WorkspaceItem {}", workspaceItemId); dspaceDepositService.createWorkflowItem(workspaceItemId, authContext); + // Item which should be published + String itemUuid = workspaceItemContext.read("$._embedded.workspaceitems[0]._embedded.item.uuid"); String accessUrl = dspaceDepositService.createAccessUrlFromItemUuid(itemUuid); // TODO 422 indicates validation errors. Should mark the deposit as failed and not retry. + // TODO Message to user - LOG.warn("Completed DSpace Deposit for Submission: {}, accessUrl: {}", + LOG.info("Completed DSpace Deposit for Submission: {}, accessUrl: {}", depositSubmission.getId(), accessUrl); return new DspaceResponse(true, accessUrl); } catch (Exception e) { diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/fs/FilesystemTransport.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/fs/FilesystemTransport.java index 96e33fb1..6d7d9b2c 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/fs/FilesystemTransport.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/fs/FilesystemTransport.java @@ -33,7 +33,6 @@ import org.eclipse.pass.deposit.cri.CriticalRepositoryInteraction; import org.eclipse.pass.deposit.cri.CriticalRepositoryInteraction.CriticalResult; import org.eclipse.pass.deposit.service.DepositUtil; -import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext; import org.eclipse.pass.deposit.transport.Transport; import org.eclipse.pass.deposit.transport.TransportResponse; import org.eclipse.pass.deposit.transport.TransportSession; @@ -111,7 +110,7 @@ class FilesystemTransportSession implements TransportSession { @Override public TransportResponse send(PackageStream packageStream, Map metadata, - DepositWorkerContext dc) { + Deposit deposit) { String filename = packageStream.metadata().name(); AtomicReference transportException = new AtomicReference<>(); diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/ftp/FtpTransportSession.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/ftp/FtpTransportSession.java index 554da4cd..7120d2cd 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/ftp/FtpTransportSession.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/ftp/FtpTransportSession.java @@ -36,9 +36,9 @@ import org.apache.commons.net.ftp.FTPClient; import org.eclipse.pass.deposit.assembler.PackageStream; -import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext; import org.eclipse.pass.deposit.transport.TransportResponse; import org.eclipse.pass.deposit.transport.TransportSession; +import org.eclipse.pass.support.client.model.Deposit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +86,7 @@ private FtpTransportSession(FTPClient ftpClient, ExecutorService executorService } @Override - public TransportResponse send(PackageStream packageStream, Map metadata, DepositWorkerContext dc) { + public TransportResponse send(PackageStream packageStream, Map metadata, Deposit deposit) { PackageStream.Metadata streamMetadata = packageStream.metadata(); diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/inveniordm/InvenioRdmSession.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/inveniordm/InvenioRdmSession.java index 846d71e4..74f23bf5 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/inveniordm/InvenioRdmSession.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/inveniordm/InvenioRdmSession.java @@ -39,9 +39,9 @@ import org.eclipse.pass.deposit.assembler.PackageStream; import org.eclipse.pass.deposit.model.DepositSubmission; import org.eclipse.pass.deposit.provider.inveniordm.InvenioRdmMetadataMapper; -import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext; import org.eclipse.pass.deposit.transport.TransportResponse; import org.eclipse.pass.deposit.transport.TransportSession; +import org.eclipse.pass.support.client.model.Deposit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.Resource; @@ -82,7 +82,7 @@ class InvenioRdmSession implements TransportSession { } @Override - public TransportResponse send(PackageStream packageStream, Map metadata, DepositWorkerContext dc) { + public TransportResponse send(PackageStream packageStream, Map metadata, Deposit deposit) { try { DepositSubmission depositSubmission = packageStream.getDepositSubmission(); LOG.warn("Processing InvenioRDM Deposit for Submission: {}", depositSubmission.getId()); diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/sftp/SftpTransportSession.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/sftp/SftpTransportSession.java index 980e04cc..3d2c10f8 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/sftp/SftpTransportSession.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/sftp/SftpTransportSession.java @@ -42,9 +42,9 @@ import org.apache.sshd.sftp.common.SftpException; import org.eclipse.pass.deposit.DepositServiceRuntimeException; import org.eclipse.pass.deposit.assembler.PackageStream; -import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext; import org.eclipse.pass.deposit.transport.TransportResponse; import org.eclipse.pass.deposit.transport.TransportSession; +import org.eclipse.pass.support.client.model.Deposit; /** * @author Russ Poetker (rpoetke1@jh.edu) @@ -58,7 +58,7 @@ class SftpTransportSession implements TransportSession { } @Override - public TransportResponse send(PackageStream packageStream, Map metadata, DepositWorkerContext dc) { + public TransportResponse send(PackageStream packageStream, Map metadata, Deposit deposit) { PackageStream.Metadata streamMetadata = packageStream.metadata(); String fileName = streamMetadata.name(); diff --git a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/sword2/Sword2TransportSession.java b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/sword2/Sword2TransportSession.java index e040328a..e9851580 100644 --- a/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/sword2/Sword2TransportSession.java +++ b/pass-deposit-services/deposit-core/src/main/java/org/eclipse/pass/deposit/transport/sword2/Sword2TransportSession.java @@ -29,7 +29,6 @@ import com.google.gson.JsonElement; import org.eclipse.pass.deposit.assembler.PackageOptions.Checksum; import org.eclipse.pass.deposit.assembler.PackageStream; -import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext; import org.eclipse.pass.deposit.transport.TransportResponse; import org.eclipse.pass.deposit.transport.TransportSession; import org.slf4j.Logger; @@ -113,7 +112,8 @@ public Sword2TransportSession(SWORDClient client, ServiceDocument serviceDocumen * SWORD v2 Profile */ @Override - public TransportResponse send(PackageStream packageStream, Map metadata, DepositWorkerContext dc) { + public TransportResponse send(PackageStream packageStream, Map metadata, + org.eclipse.pass.support.client.model.Deposit passDeposit) { if (closed) { throw new IllegalStateException("SWORDv2 transport session has been closed."); } diff --git a/pass-deposit-services/deposit-core/src/test/java/org/eclipse/pass/deposit/service/SubmissionProcessorIT.java b/pass-deposit-services/deposit-core/src/test/java/org/eclipse/pass/deposit/service/SubmissionProcessorIT.java index 624b4fe8..98ff9fc6 100644 --- a/pass-deposit-services/deposit-core/src/test/java/org/eclipse/pass/deposit/service/SubmissionProcessorIT.java +++ b/pass-deposit-services/deposit-core/src/test/java/org/eclipse/pass/deposit/service/SubmissionProcessorIT.java @@ -402,7 +402,7 @@ private void initDSpaceApiStubs() throws IOException { stubFor(post("/dspace/api/submission/workspaceitems?owningCollection=collectionuuid") .willReturn(WireMock.ok("{\"_embedded\": {\"workspaceitems\": [{\"id\": 1," - + "\"_embedded\": {\"item\": {\"uuid\": \"uuid\"}}}]}}"))); + + "\"_embedded\": {\"item\": {\"uuid\": \"uuid\", \"metadata\": {}}}}]}}"))); stubFor(patch("/dspace/api/submission/workspaceitems/1").willReturn(WireMock.ok()));