Skip to content

Commit

Permalink
Make DSpace deposit able to recover from errors. Add ability for depo…
Browse files Browse the repository at this point in the history
…sit sessions to access the Deposit object
  • Loading branch information
markpatton committed Dec 16, 2024
1 parent ff25f43 commit fdd7f5c
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ static Function<Deposit, TransportResponse> performDeposit(DepositWorkerContext

Transport transport = getTransport(packager, dc);
try (TransportSession transportSession = transport.open(packagerConfig)) {
TransportResponse tr = transportSession.send(packageStream, packagerConfig, dc);
TransportResponse tr = transportSession.send(packageStream, packagerConfig, deposit);
deposit.setDepositStatus(DepositStatus.SUBMITTED);
deposit.setDepositStatusRef(packageStream.metadata().packageDepositStatusRef());
return tr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,15 @@ public void createWorkflowItem(int workspaceItemId, AuthContext authContext) {
.body(workspaceItemUrl)
.retrieve().toBodilessEntity();
}

public String getWorkspaceItem(int workspaceItemId, AuthContext authContext) {
String json = restClient.get()
.uri("/submission/workspaceitems/{workspaceItemId}", workspaceItemId)
.header("Authorization", authContext.authToken())
.header("X-XSRF-TOKEN", authContext.xsrfToken())
.header("Cookie", "DSPACE-XSRF-COOKIE=" + authContext.xsrfToken())
.retrieve().body(String.class);

return json;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.eclipse.pass.deposit.assembler.PackageStream;
import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext;
import org.eclipse.pass.support.client.model.Deposit;

/**
* Represents an open connection, or the promise of a successful connection, with a service or system that will accept
Expand All @@ -45,10 +46,10 @@ public interface TransportSession extends AutoCloseable {
*
* @param packageStream the package and package metadata
* @param metadata transport-related metadata, or any "extra" package metadata
* @param dc information about PASS objects
* @param deposit deposit representation
* @return a response indicating success or failure of the transfer
*/
TransportResponse send(PackageStream packageStream, Map<String, String> metadata, DepositWorkerContext dc);
TransportResponse send(PackageStream packageStream, Map<String, String> metadata, Deposit deposit);

boolean closed();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.eclipse.pass.deposit.cri.CriticalRepositoryInteraction;
import org.eclipse.pass.deposit.cri.CriticalRepositoryInteraction.CriticalResult;
import org.eclipse.pass.deposit.service.DepositUtil;
import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext;
import org.eclipse.pass.deposit.transport.Transport;
import org.eclipse.pass.deposit.transport.TransportResponse;
import org.eclipse.pass.deposit.transport.TransportSession;
Expand Down Expand Up @@ -68,7 +67,7 @@ class DevNullTransportSession implements TransportSession {

@Override
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata,
DepositWorkerContext dc) {
Deposit deposit) {
// no-op, just return successful response
return new TransportResponse() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

import java.util.Map;

import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import org.eclipse.pass.deposit.assembler.PackageStream;
import org.eclipse.pass.deposit.model.DepositSubmission;
import org.eclipse.pass.deposit.provider.dspace.DSpaceMetadataMapper;
import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext;
import org.eclipse.pass.deposit.support.dspace.DspaceDepositService;
import org.eclipse.pass.deposit.support.dspace.DspaceDepositService.AuthContext;
import org.eclipse.pass.deposit.transport.TransportResponse;
import org.eclipse.pass.deposit.transport.TransportSession;
import org.eclipse.pass.support.client.PassClient;
import org.eclipse.pass.support.client.model.Deposit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,38 +51,64 @@ public DSpaceSession(DspaceDepositService dspaceDepositService, DSpaceMetadataMa
}

@Override
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata, DepositWorkerContext dc) {
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata, Deposit deposit) {
try {
DepositSubmission depositSubmission = packageStream.getDepositSubmission();

LOG.warn("Processing Dspace Deposit for Submission: {}", depositSubmission.getId());
LOG.info("Processing Dspace Deposit for Submission: {}", depositSubmission.getId());

AuthContext authContext = dspaceDepositService.authenticate();

String patchJson = dspaceMetadataMapper.patchWorkspaceItem(depositSubmission);
String workspaceItemJson = dspaceDepositService.createWorkspaceItem(
packageStream.getCustodialContent(), authContext);
// Create WorkspaceItem if it does not already exist

LOG.debug("Created WorkspaceItem: {}", workspaceItemJson);
int workspaceItemId = -1;
DocumentContext workspaceItemContext = null;

// TODO Set workspace item id on Deposit.depositStatusRef so can check if it already exists.
// TODO Then check metadata to see if it needs to be patched.
try {
workspaceItemId = Integer.parseInt(deposit.getDepositStatusRef());
} catch (NumberFormatException e) {}

int workspaceItemId = JsonPath.parse(workspaceItemJson).read("$._embedded.workspaceitems[0].id");
String itemUuid = JsonPath.parse(workspaceItemJson).read(
"$._embedded.workspaceitems[0]._embedded.item.uuid");
if (workspaceItemId == -1) {
workspaceItemContext = JsonPath.parse(dspaceDepositService.createWorkspaceItem(
packageStream.getCustodialContent(), authContext));
workspaceItemId = workspaceItemContext.read("$._embedded.workspaceitems[0].id");

LOG.debug("Patching WorkspaceItem to add metadata {}", patchJson);
dspaceDepositService.patchWorkspaceItem(workspaceItemId, patchJson, authContext);
// Use the deposit status ref to mark that the workspace item was created
deposit.setDepositStatusRef(String.valueOf(workspaceItemId));

// TODO what about packageStream.metadata() package ref, it will overwrite?

passClient.updateObject(deposit);

LOG.debug("Created WorkspaceItem: {}", workspaceItemId);
} else {
LOG.info("DSpace WorkspaceItem already exists for Submission: {}", depositSubmission.getId());

workspaceItemContext = JsonPath.parse(dspaceDepositService.getWorkspaceItem(workspaceItemId, authContext));
}

// Check metadata to see if it needs to be patched.

Map<String, Object> itemMetadata = workspaceItemContext.read("$._embedded.workspaceitems[0]._embedded.item.metadata");

if (itemMetadata.size() == 0) {
String patchJson = dspaceMetadataMapper.patchWorkspaceItem(depositSubmission);

LOG.debug("Patching WorkspaceItem to add metadata {}", patchJson);
dspaceDepositService.patchWorkspaceItem(workspaceItemId, patchJson, authContext);
}

LOG.debug("Creating WorkflowItem for WorkspaceItem {}", workspaceItemId);
dspaceDepositService.createWorkflowItem(workspaceItemId, authContext);

// Item which should be published
String itemUuid = workspaceItemContext.read("$._embedded.workspaceitems[0]._embedded.item.uuid");
String accessUrl = dspaceDepositService.createAccessUrlFromItemUuid(itemUuid);

// TODO 422 indicates validation errors. Should mark the deposit as failed and not retry.
// TODO Message to user

LOG.warn("Completed DSpace Deposit for Submission: {}, accessUrl: {}",
LOG.info("Completed DSpace Deposit for Submission: {}, accessUrl: {}",
depositSubmission.getId(), accessUrl);
return new DspaceResponse(true, accessUrl);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.eclipse.pass.deposit.cri.CriticalRepositoryInteraction;
import org.eclipse.pass.deposit.cri.CriticalRepositoryInteraction.CriticalResult;
import org.eclipse.pass.deposit.service.DepositUtil;
import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext;
import org.eclipse.pass.deposit.transport.Transport;
import org.eclipse.pass.deposit.transport.TransportResponse;
import org.eclipse.pass.deposit.transport.TransportSession;
Expand Down Expand Up @@ -111,7 +110,7 @@ class FilesystemTransportSession implements TransportSession {

@Override
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata,
DepositWorkerContext dc) {
Deposit deposit) {
String filename = packageStream.metadata().name();
AtomicReference<Exception> transportException = new AtomicReference<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@

import org.apache.commons.net.ftp.FTPClient;
import org.eclipse.pass.deposit.assembler.PackageStream;
import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext;
import org.eclipse.pass.deposit.transport.TransportResponse;
import org.eclipse.pass.deposit.transport.TransportSession;
import org.eclipse.pass.support.client.model.Deposit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,7 +86,7 @@ private FtpTransportSession(FTPClient ftpClient, ExecutorService executorService
}

@Override
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata, DepositWorkerContext dc) {
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata, Deposit deposit) {

PackageStream.Metadata streamMetadata = packageStream.metadata();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.eclipse.pass.deposit.assembler.PackageStream;
import org.eclipse.pass.deposit.model.DepositSubmission;
import org.eclipse.pass.deposit.provider.inveniordm.InvenioRdmMetadataMapper;
import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext;
import org.eclipse.pass.deposit.transport.TransportResponse;
import org.eclipse.pass.deposit.transport.TransportSession;
import org.eclipse.pass.support.client.model.Deposit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
Expand Down Expand Up @@ -82,7 +82,7 @@ class InvenioRdmSession implements TransportSession {
}

@Override
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata, DepositWorkerContext dc) {
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata, Deposit deposit) {
try {
DepositSubmission depositSubmission = packageStream.getDepositSubmission();
LOG.warn("Processing InvenioRDM Deposit for Submission: {}", depositSubmission.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import org.apache.sshd.sftp.common.SftpException;
import org.eclipse.pass.deposit.DepositServiceRuntimeException;
import org.eclipse.pass.deposit.assembler.PackageStream;
import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext;
import org.eclipse.pass.deposit.transport.TransportResponse;
import org.eclipse.pass.deposit.transport.TransportSession;
import org.eclipse.pass.support.client.model.Deposit;

/**
* @author Russ Poetker ([email protected])
Expand All @@ -58,7 +58,7 @@ class SftpTransportSession implements TransportSession {
}

@Override
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata, DepositWorkerContext dc) {
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata, Deposit deposit) {
PackageStream.Metadata streamMetadata = packageStream.metadata();
String fileName = streamMetadata.name();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.gson.JsonElement;
import org.eclipse.pass.deposit.assembler.PackageOptions.Checksum;
import org.eclipse.pass.deposit.assembler.PackageStream;
import org.eclipse.pass.deposit.service.DepositUtil.DepositWorkerContext;
import org.eclipse.pass.deposit.transport.TransportResponse;
import org.eclipse.pass.deposit.transport.TransportSession;
import org.slf4j.Logger;
Expand Down Expand Up @@ -113,7 +112,8 @@ public Sword2TransportSession(SWORDClient client, ServiceDocument serviceDocumen
* SWORD v2 Profile</a>
*/
@Override
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata, DepositWorkerContext dc) {
public TransportResponse send(PackageStream packageStream, Map<String, String> metadata,
org.eclipse.pass.support.client.model.Deposit passDeposit) {
if (closed) {
throw new IllegalStateException("SWORDv2 transport session has been closed.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ private void initDSpaceApiStubs() throws IOException {

stubFor(post("/dspace/api/submission/workspaceitems?owningCollection=collectionuuid")
.willReturn(WireMock.ok("{\"_embedded\": {\"workspaceitems\": [{\"id\": 1,"
+ "\"_embedded\": {\"item\": {\"uuid\": \"uuid\"}}}]}}")));
+ "\"_embedded\": {\"item\": {\"uuid\": \"uuid\", \"metadata\": {}}}}]}}")));

stubFor(patch("/dspace/api/submission/workspaceitems/1").willReturn(WireMock.ok()));

Expand Down

0 comments on commit fdd7f5c

Please sign in to comment.