Skip to content

Commit

Permalink
NXP-32515: Propagate retrieve requests
Browse files Browse the repository at this point in the history
  • Loading branch information
guirenard committed Jun 7, 2024
1 parent 1df1a73 commit 1c6107e
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* (C) Copyright 2024 Nuxeo (http://nuxeo.com/) and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Guillaume Renard
*/

package org.nuxeo.coldstorage.action;

import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM;
import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1;
import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.coldstorage.ColdStorageConstants;
import org.nuxeo.coldstorage.service.ColdStorageService;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.DocumentModelList;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.stream.StreamProcessorTopology;

/**
* Bulk action in charge of marking documents being retrieved.
*
* @since 2023.4
*/
public class PropagateRetrieveFromColdStorageContentAction implements StreamProcessorTopology {

private static final Logger log = LogManager.getLogger(PropagateRetrieveFromColdStorageContentAction.class);

public static final String ACTION_NAME = "propagateRetrieveFromColdStorage";

public static final String ACTION_FULL_NAME = "bulk/" + ACTION_NAME;

@Override
public Topology getTopology(Map<String, String> options) {
return Topology.builder()
.addComputation(PropagateRetrieveFromColdStorageContentComputation::new, //
List.of(INPUT_1 + ":" + ACTION_FULL_NAME, OUTPUT_1 + ":" + STATUS_STREAM))
.build();
}

public static class PropagateRetrieveFromColdStorageContentComputation extends AbstractBulkComputation {

public PropagateRetrieveFromColdStorageContentComputation() {
super(ACTION_FULL_NAME);
}

@Override
protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) {
log.debug("Start computing documents of which a retrieve from ColdStorage is ongoing {}", ids);
DocumentModelList documents = loadDocuments(session, ids);

ColdStorageService service = Framework.getService(ColdStorageService.class);

long errorCount = 0;
for (DocumentModel document : documents) {
if (!document.hasFacet(ColdStorageConstants.COLD_STORAGE_FACET_NAME)) {
log.info("The main content for document: {} is not in cold storage.", document::getId);
continue;
}
try {
service.proceedRetrieveMainContent(session, document);
} catch (NuxeoException e) {
errorCount++;
delta.inError(String.format("Cannot propagate retrieve from cold storage for document %s: %s", document.getId(),
e.getMessage()));
log.warn("Could not propagate retrieve from cold storage for document: {}", document::getId,
() -> e);
}
}
delta.setErrorCount(errorCount);
log.debug("End computing documents of which a retrieve from ColdStorage is ongoing");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,11 @@ DocumentModel proceedRestoreMainContent(CoreSession session, DocumentModel docum
*/
void propagateRestoreFromColdStorage(CoreSession session, String blobDigest);

/**
* Internal use.
*
* @since 2023.4
*/
void proceedRetrieveMainContent(CoreSession session, DocumentModel documentModel);

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.nuxeo.coldstorage.action.CheckColdStorageAvailabilityAction;
import org.nuxeo.coldstorage.action.PropagateMoveToColdStorageContentAction;
import org.nuxeo.coldstorage.action.PropagateRestoreFromColdStorageContentAction;
import org.nuxeo.coldstorage.action.PropagateRetrieveFromColdStorageContentAction;
import org.nuxeo.ecm.core.api.Blob;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
Expand Down Expand Up @@ -280,7 +281,7 @@ public DocumentModel proceedMoveToColdStorage(CoreSession session, DocumentRef d
BlobUpdateContext updateContext = new BlobUpdateContext(key).withColdStorageClass(true);
Framework.getService(BlobManager.class).getBlobProvider(coldContent).updateBlob(updateContext);
} else {
log.warn("Main blob {} for document {} is already in cold storage with storage class {}",
log.debug("Main blob {} for document {} is already in cold storage with storage class {}",
coldContent::getDigest, documentModel::getId, oldStatus::getStorageClass);
}
} catch (IOException e) {
Expand Down Expand Up @@ -348,23 +349,27 @@ public DocumentModel retrieveFromColdStorage(CoreSession session, DocumentRef do
documentModel.setPropertyValue(COLD_STORAGE_CONTENT_DOWNLOADABLE_UNTIL,
Date.from(blobStatus.getDownloadableUntil()));
doNotify = doc -> false;
} else if (blobStatus.isOngoingRestore()) {
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
doNotify = doc -> true;
} else {
try {
BlobUpdateContext updateContext = new BlobUpdateContext(key).withRestoreForDuration(restoreDuration);
Framework.getService(BlobManager.class).getBlobProvider(coldContent).updateBlob(updateContext);
} catch (IOException e) {
log.error("Could not retrieve document {} for duration {} seconds", documentModel::getId,
restoreDuration::getSeconds);
throw new NuxeoException(e);
if (blobStatus.isOngoingRestore()) {
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
doNotify = doc -> true;
} else {
try {
BlobUpdateContext updateContext = new BlobUpdateContext(key).withRestoreForDuration(
restoreDuration);
Framework.getService(BlobManager.class).getBlobProvider(coldContent).updateBlob(updateContext);
} catch (IOException e) {
log.error("Could not retrieve document {} for duration {} seconds", documentModel::getId,
restoreDuration::getSeconds);
throw new NuxeoException(e);
}
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
doNotify = doc -> CoreInstance.doPrivileged(session, s -> {
// The check retrieval may need to modify metadata of document too
return !checkIsRetrieved(s, doc);
});
}
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
doNotify = doc -> CoreInstance.doPrivileged(session, s -> {
// The check retrieval may need to modify metadata of document too
return !checkIsRetrieved(s, doc);
});
propagateRetrieveFromColdStorage(session, key);
}
docResult = CoreInstance.doPrivileged(session, s -> {
// The retrieval is allowed for users with only READ access.
Expand Down Expand Up @@ -531,7 +536,7 @@ public void propagateMoveToColdStorage(CoreSession session, String blobDigest) {
* Restore from ColdStorage all documents referencing the given blob digests as main content.
*
* @param session the session
* @param blobDigests the blob digests
* @param blobDigest the blob digests
*/
public void propagateRestoreFromColdStorage(CoreSession session, String blobDigest) {
String query = String.format("SELECT * FROM Document WHERE %s/digest = '%s'", COLD_STORAGE_CONTENT_PROPERTY,
Expand All @@ -545,6 +550,25 @@ public void propagateRestoreFromColdStorage(CoreSession session, String blobDige
() -> bulkService.getStatus(commandId));
}

/**
* Retrieve from ColdStorage all documents referencing the given blob digests as main content.
*
* @param session the session
* @param blobDigest the blob digests
* @since 2023.4
*/
public void propagateRetrieveFromColdStorage(CoreSession session, String blobDigest) {
String query = String.format("SELECT * FROM Document WHERE %s/digest = '%s' AND (%s IS NULL OR %s = 0)",
COLD_STORAGE_CONTENT_PROPERTY, blobDigest, COLD_STORAGE_BEING_RETRIEVED_PROPERTY, COLD_STORAGE_BEING_RETRIEVED_PROPERTY);

BulkService bulkService = Framework.getService(BulkService.class);
String username = SecurityConstants.SYSTEM_USERNAME;
String commandId = bulkService.submitTransactional(new BulkCommand.Builder(
PropagateRetrieveFromColdStorageContentAction.ACTION_NAME, query, username).build());
log.debug("Restoring documents referencing blob: {}, status: {}", () -> blobDigest,
() -> bulkService.getStatus(commandId));
}

@Override
public void checkDocToBeRetrieved(CoreSession session) {
BulkService bulkService = Framework.getService(BulkService.class);
Expand Down Expand Up @@ -634,4 +658,10 @@ protected void fireEvent(DocumentModel doc, CoreSession session, String eventNam
eventService.fireEvent(event);
}

@Override
public void proceedRetrieveMainContent(CoreSession session, DocumentModel documentModel) {
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
fireEvent(documentModel, session, COLD_STORAGE_CONTENT_TO_RETRIEVE_EVENT_NAME);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
bucketSize="100" batchSize="20" httpEnabled="false" />
<action name="propagateRestoreFromColdStorage" inputStream="bulk/propagateRestoreFromColdStorage"
bucketSize="100" batchSize="20" httpEnabled="false" />
<action name="propagateRetrieveFromColdStorage" inputStream="bulk/propagateRetrieveFromColdStorage"
bucketSize="100" batchSize="20" httpEnabled="false" />
<action name="checkColdStorageAvailability" inputStream="bulk/checkColdStorageAvailability"
bucketSize="100" batchSize="20" httpEnabled="false" />
</extension>
Expand All @@ -31,6 +33,12 @@
defaultPartitions="${nuxeo.bulk.action.propagateRestoreFromColdStorage.defaultPartitions:=4}">
<policy name="default" maxRetries="3" delay="1s" maxDelay="10s" continueOnFailure="true" />
</streamProcessor>
<streamProcessor name="propagateRetrieveFromColdStorage"
class="org.nuxeo.coldstorage.action.PropagateRetrieveFromColdStorageContentAction"
defaultConcurrency="${nuxeo.bulk.action.propagateRetrieveFromColdStorage.defaultConcurrency:=2}"
defaultPartitions="${nuxeo.bulk.action.propagateRetrieveFromColdStorage.defaultPartitions:=4}">
<policy name="default" maxRetries="3" delay="1s" maxDelay="10s" continueOnFailure="true" />
</streamProcessor>
<streamProcessor name="checkColdStorageAvailability"
class="org.nuxeo.coldstorage.action.CheckColdStorageAvailabilityAction"
defaultScroller="${nuxeo.bulk.action.checkColdStorageAvailability.scroller:=default}"
Expand Down

0 comments on commit 1c6107e

Please sign in to comment.