Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch: Async handling of indexing/deletion requests #8465

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d3818ea
Handle async responses asynchronously.
tobias-hotz Oct 22, 2024
2c08974
Delegate the responsibility for batching to the caller
tobias-hotz Oct 29, 2024
418de0a
Dynamically compute the batch size based on the number of expected in…
tobias-hotz Oct 30, 2024
9519a92
Use a cleaner to make sure all documents get send to the index, even …
tobias-hotz Oct 31, 2024
a9a919e
Don't use list version of indexMetadata for single entries
tobias-hotz Oct 31, 2024
0d49d86
Remove no longer required method forceIndexChanges
tobias-hotz Oct 31, 2024
ff5c5dc
Remove commit index changes from frontend
tobias-hotz Nov 5, 2024
34e1607
Change how a running index job is determined
tobias-hotz Nov 5, 2024
83ea448
Fixed using the wrong map in BatchingIndexSubmittor
tobias-hotz Nov 5, 2024
66e9c38
Fix field updating not refreshing
tobias-hotz Nov 7, 2024
948575b
Fix UserSelectionsApiTest being too strict about the submittor
tobias-hotz Nov 7, 2024
95e4a57
Add support for batch deletion as well
tobias-hotz Nov 11, 2024
827539e
Fix batch deletion
tobias-hotz Nov 11, 2024
61fb7ad
Allow delete by query to be "batched" by running them async
tobias-hotz Nov 11, 2024
9cd5242
submittor -> submitter
tobias-hotz Dec 6, 2024
2b6476c
Remove debug sleep
tobias-hotz Dec 6, 2024
b8952a0
Remove unused never implemented that still references forceRefreshRea…
tobias-hotz Dec 6, 2024
d6b9655
Improve log message when deleting
tobias-hotz Dec 6, 2024
c65cc71
Merge remote-tracking branch 'refs/remotes/upstream/main' into async_…
tobias-hotz Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions core/src/main/java/org/fao/geonet/kernel/DataManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.fao.geonet.kernel.datamanager.IMetadataValidator;
import org.fao.geonet.kernel.schema.MetadataSchema;
import org.fao.geonet.kernel.search.IndexingMode;
import org.fao.geonet.kernel.search.submission.IIndexSubmitter;
import org.fao.geonet.repository.UserGroupRepository;
import org.jdom.Element;
import org.slf4j.Logger;
Expand All @@ -65,7 +66,6 @@
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -153,19 +153,14 @@ public void batchIndexInThreadPool(ServiceContext context, List<?> metadataIds)
metadataIndexer.batchIndexInThreadPool(context, metadataIds);
}

@Deprecated
public boolean isIndexing() {
return metadataIndexer.isIndexing();
}

@Deprecated
public void indexMetadata(final List<String> metadataIds) throws Exception {
metadataIndexer.indexMetadata(metadataIds);
}

@Deprecated
public void indexMetadata(final String metadataId, boolean forceRefreshReaders) throws Exception {
metadataIndexer.indexMetadata(metadataId, forceRefreshReaders, IndexingMode.full);
public void indexMetadata(final String metadataId, IIndexSubmitter indexSubmittor) throws Exception {
metadataIndexer.indexMetadata(metadataId, indexSubmittor, IndexingMode.full);
}

@Deprecated
Expand Down Expand Up @@ -361,10 +356,10 @@ public String insertMetadata(ServiceContext context, String schema, Element meta

@Deprecated
public AbstractMetadata insertMetadata(ServiceContext context, AbstractMetadata newMetadata, Element metadataXml, IndexingMode indexingMode,
boolean updateFixedInfo, UpdateDatestamp updateDatestamp, boolean fullRightsForGroup, boolean forceRefreshReaders)
boolean updateFixedInfo, UpdateDatestamp updateDatestamp, boolean fullRightsForGroup, IIndexSubmitter indexSubmittor)
throws Exception {
return metadataManager.insertMetadata(context, newMetadata, metadataXml, indexingMode, updateFixedInfo, updateDatestamp,
fullRightsForGroup, forceRefreshReaders);
fullRightsForGroup, indexSubmittor);
}

@Deprecated
Expand Down Expand Up @@ -578,11 +573,6 @@ public void flush() {
metadataManager.flush();
}

@Deprecated
public void forceIndexChanges() throws IOException {
metadataIndexer.forceIndexChanges();
}

@Deprecated
public int batchDeleteMetadataAndUpdateIndex(Specification<? extends AbstractMetadata> specification) throws Exception {
return metadataIndexer.batchDeleteMetadataAndUpdateIndex(specification);
Expand Down
19 changes: 4 additions & 15 deletions core/src/main/java/org/fao/geonet/kernel/IndexMetadataTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
import org.fao.geonet.Util;
import org.fao.geonet.constants.Geonet;
import org.fao.geonet.domain.User;
import org.fao.geonet.kernel.search.EsSearchManager;
import org.fao.geonet.kernel.search.submission.batch.BatchingIndexSubmitter;
import org.fao.geonet.utils.Log;
import org.springframework.transaction.TransactionStatus;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -49,8 +47,6 @@ public final class IndexMetadataTask implements Runnable {
private final List<?> _metadataIds;
private final TransactionStatus _transactionStatus;
private final Set<IndexMetadataTask> _batchIndex;
private final EsSearchManager searchManager;
private final AtomicInteger indexed;
private User _user;

/**
Expand All @@ -61,13 +57,11 @@ public final class IndexMetadataTask implements Runnable {
* @param transactionStatus if non-null, wait for the transaction to complete before indexing
*/
public IndexMetadataTask(@Nonnull ServiceContext context, @Nonnull List<?> metadataIds, Set<IndexMetadataTask> batchIndex,
@Nullable TransactionStatus transactionStatus, @Nonnull AtomicInteger indexed) {
this.indexed = indexed;
@Nullable TransactionStatus transactionStatus) {
this._transactionStatus = transactionStatus;
this._context = context;
this._metadataIds = metadataIds;
this._batchIndex = batchIndex;
this.searchManager = context.getBean(EsSearchManager.class);

batchIndex.add(this);

Expand All @@ -77,7 +71,7 @@ public IndexMetadataTask(@Nonnull ServiceContext context, @Nonnull List<?> metad
}

public void run() {
try {
try (BatchingIndexSubmitter batchingIndexSubmittor = new BatchingIndexSubmitter(_metadataIds.size())) {
_context.setAsThreadLocal();
while (_transactionStatus != null && !_transactionStatus.isCompleted()) {
try {
Expand All @@ -101,13 +95,9 @@ public void run() {
DataManager dataManager = _context.getBean(DataManager.class);
// servlet up so safe to index all metadata that needs indexing
for (Object metadataId : _metadataIds) {
this.indexed.incrementAndGet();
if (this.indexed.compareAndSet(500, 0)) {
searchManager.forceIndexChanges();
}

try {
dataManager.indexMetadata(metadataId.toString(), false);
dataManager.indexMetadata(metadataId.toString(), batchingIndexSubmittor);
} catch (Exception e) {
Log.error(Geonet.INDEX_ENGINE, "Error indexing metadata '" + metadataId + "': " + e.getMessage()
+ "\n" + Util.getStackTrace(e));
Expand All @@ -116,7 +106,6 @@ public void run() {
if (_user != null && _context.getUserSession().getUserId() == null) {
_context.getUserSession().loginAs(_user);
}
searchManager.forceIndexChanges();
} finally {
_batchIndex.remove(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@

package org.fao.geonet.kernel.datamanager;

import java.io.IOException;
import java.util.List;

import org.fao.geonet.domain.AbstractMetadata;
import org.fao.geonet.kernel.search.IndexingMode;
import org.fao.geonet.kernel.search.submission.IIndexSubmitter;
import org.jdom.Element;
import org.springframework.data.jpa.domain.Specification;

Expand All @@ -49,11 +49,6 @@ public interface IMetadataIndexer {
*/
public void init(ServiceContext context, Boolean force) throws Exception;

/**
* Force the index to wait until all changes are processed and the next reader obtained will get the latest data.
*/
void forceIndexChanges() throws IOException;

/**
* Remove the records that matches the specification
*
Expand All @@ -77,13 +72,6 @@ public interface IMetadataIndexer {
*/
void batchIndexInThreadPool(ServiceContext context, List<?> metadataIds);

/**
* Is the platform currently indexing?
*
* @return
*/
boolean isIndexing();

/**
* Index the list of records passed as parameter in order.
*
Expand All @@ -92,7 +80,7 @@ public interface IMetadataIndexer {
*/
void indexMetadata(List<String> metadataIds) throws Exception;

void indexMetadata(String metadataId, boolean forceRefreshReaders, IndexingMode indexingMode) throws Exception;
void indexMetadata(String metadataId, IIndexSubmitter indexSubmittor, IndexingMode indexingMode) throws Exception;

void indexMetadataPrivileges(String uuid, int id) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.fao.geonet.kernel.EditLib;
import org.fao.geonet.kernel.UpdateDatestamp;
import org.fao.geonet.kernel.search.IndexingMode;
import org.fao.geonet.kernel.search.submission.IDeletionSubmitter;
import org.fao.geonet.kernel.search.submission.IIndexSubmitter;
import org.fao.geonet.repository.BatchUpdateQuery;
import org.fao.geonet.repository.PathSpec;
import org.fao.geonet.repository.Updater;
Expand Down Expand Up @@ -68,9 +70,10 @@ public interface IMetadataManager {
*
* @param context
* @param metadataId
* @param deletionSubmittor
* @throws Exception
*/
void deleteMetadata(ServiceContext context, String metadataId) throws Exception;
void deleteMetadata(ServiceContext context, String metadataId, IDeletionSubmitter deletionSubmittor) throws Exception;

/**
* Delete the record with the id metadataId
Expand Down Expand Up @@ -147,12 +150,12 @@ String insertMetadata(ServiceContext context, String schema, Element metadataXml
* @param updateFixedInfo
* @param updateDatestamp
* @param fullRightsForGroup
* @param forceRefreshReaders
* @param indexSubmittor
* @return
* @throws Exception
*/
AbstractMetadata insertMetadata(ServiceContext context, AbstractMetadata newMetadata, Element metadataXml, IndexingMode indexingMode,
boolean updateFixedInfo, UpdateDatestamp updateDatestamp, boolean fullRightsForGroup, boolean forceRefreshReaders)
boolean updateFixedInfo, UpdateDatestamp updateDatestamp, boolean fullRightsForGroup, IIndexSubmitter indexSubmittor)
throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
import org.fao.geonet.kernel.search.EsSearchManager;
import org.fao.geonet.kernel.search.IndexFields;
import org.fao.geonet.kernel.search.IndexingMode;
import org.fao.geonet.kernel.search.submission.batch.BatchingDeletionSubmitter;
import org.fao.geonet.kernel.search.submission.batch.BatchingIndexSubmitter;
import org.fao.geonet.kernel.search.submission.IIndexSubmitter;
import org.fao.geonet.kernel.setting.SettingManager;
import org.fao.geonet.kernel.setting.Settings;
import org.fao.geonet.repository.*;
Expand All @@ -69,13 +72,11 @@
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.TransactionAspectSupport;

import java.io.IOException;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import static org.fao.geonet.resources.Resources.DEFAULT_LOGO_EXTENSION;

Expand Down Expand Up @@ -147,11 +148,6 @@ public void setMetadataManager(IMetadataManager metadataManager) {
Set<String> indexing = new HashSet<String>();
Set<IndexMetadataTask> batchIndex = ConcurrentHashMap.newKeySet();

@Override
public void forceIndexChanges() throws IOException {
searchManager.forceIndexChanges();
}

@Override
public int batchDeleteMetadataAndUpdateIndex(Specification<? extends AbstractMetadata> specification)
throws Exception {
Expand All @@ -173,29 +169,31 @@ public int batchDeleteMetadataAndUpdateIndex(Specification<? extends AbstractMet
// searchManager.delete(metadataToDelete.stream().map(input -> Integer.toString(input.getId())).collect(Collectors.toList()));
// metadataManager.deleteAll(specification);
// So delete one by one even if slower
metadataToDelete.forEach(md -> {
try {
// Extract information for RecordDeletedEvent
LinkedHashMap<String, String> titles = metadataUtils.extractTitles(Integer.toString(md.getId()));
UserSession userSession = ServiceContext.get().getUserSession();
String xmlBefore = md.getData();

store.delResources(ServiceContext.get(), md.getUuid());
metadataManager.deleteMetadata(ServiceContext.get(), String.valueOf(md.getId()));

// Trigger RecordDeletedEvent
new RecordDeletedEvent(md.getId(), md.getUuid(), titles, userSession.getUserIdAsInt(), xmlBefore).publish(ApplicationContextHolder.get());
} catch (Exception e) {
Log.warning(Geonet.DATA_MANAGER, String.format(

"Error during removal of metadata %s part of batch delete operation. " +
"This error may create a ghost record (ie. not in the index " +
"but still present in the database). " +
"You can reindex the catalogue to see it again. " +
"Error was: %s.", md.getUuid(), e.getMessage()));
e.printStackTrace();
}
});
try (BatchingDeletionSubmitter submitter = new BatchingDeletionSubmitter(metadataToDelete.size())) {
metadataToDelete.forEach(md -> {
try {
// Extract information for RecordDeletedEvent
LinkedHashMap<String, String> titles = metadataUtils.extractTitles(Integer.toString(md.getId()));
UserSession userSession = ServiceContext.get().getUserSession();
String xmlBefore = md.getData();

store.delResources(ServiceContext.get(), md.getUuid());
metadataManager.deleteMetadata(ServiceContext.get(), String.valueOf(md.getId()), submitter);

// Trigger RecordDeletedEvent
new RecordDeletedEvent(md.getId(), md.getUuid(), titles, userSession.getUserIdAsInt(), xmlBefore).publish(ApplicationContextHolder.get());
} catch (Exception e) {
Log.warning(Geonet.DATA_MANAGER, String.format(

"Error during removal of metadata %s part of batch delete operation. " +
"This error may create a ghost record (ie. not in the index " +
"but still present in the database). " +
"You can reindex the catalogue to see it again. " +
"Error was: %s.", md.getUuid(), e.getMessage()));
e.printStackTrace();
}
});
}

return metadataToDelete.size();
}
Expand Down Expand Up @@ -268,7 +266,6 @@ public void batchIndexInThreadPool(ServiceContext context, List<?> metadataIds)
Log.debug(Geonet.INDEX_ENGINE, "Indexing " + metadataIds.size() + " records.");
Log.debug(Geonet.INDEX_ENGINE, metadataIds.toString());
}
AtomicInteger numIndexedTracker = new AtomicInteger();
while (index < metadataIds.size()) {
int start = index;
int count = Math.min(perThread, metadataIds.size() - start);
Expand All @@ -285,29 +282,26 @@ public void batchIndexInThreadPool(ServiceContext context, List<?> metadataIds)
}

// create threads to process this chunk of ids
Runnable worker = new IndexMetadataTask(context, subList, batchIndex, transactionStatus, numIndexedTracker);
Runnable worker = new IndexMetadataTask(context, subList, batchIndex, transactionStatus);
executor.execute(worker);
index += count;
}

executor.shutdown();
}

@Override
public boolean isIndexing() {
return searchManager.isIndexing();
}

@Override
public void indexMetadata(final List<String> metadataIds) throws Exception {
for (String metadataId : metadataIds) {
indexMetadata(metadataId, true, IndexingMode.full);
try (BatchingIndexSubmitter indexSubmittor = new BatchingIndexSubmitter(metadataIds.size())) {
for (String metadataId : metadataIds) {
indexMetadata(metadataId, indexSubmittor, IndexingMode.full);
}
}
}

@Override
public void indexMetadata(final String metadataId,
final boolean forceRefreshReaders,
final IIndexSubmitter indexSubmittor,
final IndexingMode indexingMode)
throws Exception {
AbstractMetadata fullMd;
Expand Down Expand Up @@ -388,7 +382,7 @@ public void indexMetadata(final String metadataId,
"error",
Map.of("record", metadataId, "schema", schema)));
searchManager.index(null, md, indexKey, fields, metadataType,
forceRefreshReaders, indexingMode);
indexSubmittor, indexingMode);
Log.error(Geonet.DATA_MANAGER, String.format(
"Record %s / Schema '%s' is not registered in this catalog. Install it or remove those records. Record is indexed indexing error flag.",
metadataId, schema));
Expand Down Expand Up @@ -533,7 +527,7 @@ public void indexMetadata(final String metadataId,
}

searchManager.index(schemaManager.getSchemaDir(schema), md, indexKey, fields, metadataType,
forceRefreshReaders, indexingMode);
indexSubmittor, indexingMode);
}
} catch (Exception x) {
Log.error(Geonet.DATA_MANAGER, "The metadata document index with id=" + metadataId
Expand Down
Loading
Loading