From 767194b7e9a4732c692d063ec752b35da2fdecf5 Mon Sep 17 00:00:00 2001 From: Tobias Hotz Date: Mon, 11 Nov 2024 16:58:20 +0100 Subject: [PATCH] Allow delete by query to be "batched" by running them async --- .../datamanager/base/BaseMetadataManager.java | 12 ++++++---- .../draft/DraftMetadataManager.java | 2 +- .../datamanager/draft/DraftMetadataUtils.java | 2 +- .../geonet/kernel/search/EsSearchManager.java | 21 ++++++++++++---- .../geonet/kernel/search/ISearchManager.java | 2 +- .../submission/DirectDeletionSubmittor.java | 14 ++++++++++- .../search/submission/IDeletionSubmittor.java | 4 +++- .../batch/BatchingDeletionSubmittor.java | 23 +++++++++++++++++- .../org/fao/geonet/index/es/EsRestClient.java | 18 ++------------ .../worker/EsWFSFeatureIndexer.java | 24 ++++++++++++++----- 10 files changed, 85 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/fao/geonet/kernel/datamanager/base/BaseMetadataManager.java b/core/src/main/java/org/fao/geonet/kernel/datamanager/base/BaseMetadataManager.java index 83984d3bbf8..055111d8627 100644 --- a/core/src/main/java/org/fao/geonet/kernel/datamanager/base/BaseMetadataManager.java +++ b/core/src/main/java/org/fao/geonet/kernel/datamanager/base/BaseMetadataManager.java @@ -252,9 +252,11 @@ public void synchronizeDbWithIndex(ServiceContext context) throws Exception { } // remove from index metadata not in DBMS - for (String id : docs.keySet()) { - getSearchManager().deleteByQuery(String.format("+id:%s", id)); - LOGGER_DATA_MANAGER.debug("- removed record ({}) from index", id); + try (BatchingDeletionSubmittor submittor = new BatchingDeletionSubmittor()) { + for (String id : docs.keySet()) { + getSearchManager().deleteByQuery(String.format("+id:%s", id), submittor); + LOGGER_DATA_MANAGER.debug("- removed record ({}) from index", id); + } } } @@ -342,7 +344,7 @@ public void deleteMetadata(ServiceContext context, String metadataId, IDeletionS // --- update search criteria getSearchManager().deleteByUuid(findOne.getUuid(), submittor); } else { - getSearchManager().deleteByQuery(String.format("+id:%s", metadataId)); + getSearchManager().deleteByQuery(String.format("+id:%s", metadataId), submittor); } } @@ -384,7 +386,7 @@ public void purgeMetadata(ServiceContext context, String metadataId, boolean wit @Override public void deleteMetadataGroup(ServiceContext context, String metadataId) throws Exception { deleteMetadataFromDB(context, metadataId); - getSearchManager().deleteByQuery(String.format("+id:%s", metadataId)); + getSearchManager().deleteByQuery(String.format("+id:%s", metadataId), DirectDeletionSubmittor.INSTANCE); } /** diff --git a/core/src/main/java/org/fao/geonet/kernel/datamanager/draft/DraftMetadataManager.java b/core/src/main/java/org/fao/geonet/kernel/datamanager/draft/DraftMetadataManager.java index 0cf546d36d0..54ad4f4f50e 100644 --- a/core/src/main/java/org/fao/geonet/kernel/datamanager/draft/DraftMetadataManager.java +++ b/core/src/main/java/org/fao/geonet/kernel/datamanager/draft/DraftMetadataManager.java @@ -89,7 +89,7 @@ public void deleteMetadata(ServiceContext context, String metadataId, IDeletionS deleteMetadataFromDB(context, metadataId); getSearchManager().deleteByUuid(findOne.getUuid(), submittor); } else { - getSearchManager().deleteByQuery(String.format("+id:%s", metadataId)); + getSearchManager().deleteByQuery(String.format("+id:%s", metadataId), submittor); } } diff --git a/core/src/main/java/org/fao/geonet/kernel/datamanager/draft/DraftMetadataUtils.java b/core/src/main/java/org/fao/geonet/kernel/datamanager/draft/DraftMetadataUtils.java index a3ec98a249d..6bbadffa383 100644 --- a/core/src/main/java/org/fao/geonet/kernel/datamanager/draft/DraftMetadataUtils.java +++ b/core/src/main/java/org/fao/geonet/kernel/datamanager/draft/DraftMetadataUtils.java @@ -653,7 +653,7 @@ public void cancelEditingSession(ServiceContext context, String id) throws Excep // --- remove metadata xmlSerializer.delete(id, ServiceContext.get()); - searchManager.deleteByQuery(String.format("+id:%s", id)); + searchManager.deleteByQuery(String.format("+id:%s", id), DirectDeletionSubmittor.INSTANCE); // Unset METADATA_EDITING_CREATED_DRAFT flag context.getUserSession().removeProperty(Geonet.Session.METADATA_EDITING_CREATED_DRAFT); diff --git a/core/src/main/java/org/fao/geonet/kernel/search/EsSearchManager.java b/core/src/main/java/org/fao/geonet/kernel/search/EsSearchManager.java index 6cbd9512d0d..391620988b4 100644 --- a/core/src/main/java/org/fao/geonet/kernel/search/EsSearchManager.java +++ b/core/src/main/java/org/fao/geonet/kernel/search/EsSearchManager.java @@ -55,6 +55,7 @@ import org.fao.geonet.kernel.datamanager.IMetadataIndexer; import org.fao.geonet.kernel.datamanager.IMetadataUtils; import org.fao.geonet.kernel.search.index.OverviewIndexFieldUpdater; +import org.fao.geonet.kernel.search.submission.DirectDeletionSubmittor; import org.fao.geonet.kernel.search.submission.IDeletionSubmittor; import org.fao.geonet.kernel.search.submission.batch.BatchingIndexSubmittor; import org.fao.geonet.kernel.search.submission.IIndexSubmittor; @@ -479,6 +480,18 @@ public void handleDeletionResponse(BulkResponse bulkResponse, List docum } } + public void handleDeletionResponse(DeleteByQueryResponse deleteByQueryResponse, String query) { + if (!deleteByQueryResponse.failures().isEmpty()) { + StringBuilder stringBuilder = new StringBuilder(); + + deleteByQueryResponse.failures().forEach(f -> stringBuilder.append(f.toString())); + + throw new RuntimeException(String.format( + "Error during removal of query %s. Errors are '%s'.", query, stringBuilder.toString() + )); + } + } + private void checkIndexResponse(BulkResponse bulkItemResponses, Map documents) throws IOException { if (bulkItemResponses.errors()) { @@ -810,7 +823,7 @@ public Map getFieldsValues(String id, Set fields, String public void clearIndex() throws Exception { - client.deleteByQuery(defaultIndex, "*:*"); + deleteByQuery("*:*", DirectDeletionSubmittor.INSTANCE); } static ImmutableSet docsChangeIncludedFields; @@ -868,13 +881,13 @@ public ISODate getDocChangeDate(String mdId) throws Exception { } @Override - public void deleteByQuery(String txt) throws Exception { - client.deleteByQuery(defaultIndex, txt); + public void deleteByQuery(String query, IDeletionSubmittor submittor) throws Exception { + submittor.submitQueryToIndex(query, this); } @Override public void deleteByUuid(String uuid, IDeletionSubmittor submittor) throws Exception { - submittor.submitToIndex(uuid, this); + submittor.submitUUIDToIndex(uuid, this); } public long getNumDocs(String query) throws Exception { diff --git a/core/src/main/java/org/fao/geonet/kernel/search/ISearchManager.java b/core/src/main/java/org/fao/geonet/kernel/search/ISearchManager.java index 324daf51749..b1fb7b97772 100644 --- a/core/src/main/java/org/fao/geonet/kernel/search/ISearchManager.java +++ b/core/src/main/java/org/fao/geonet/kernel/search/ISearchManager.java @@ -73,7 +73,7 @@ boolean rebuildIndex(ServiceContext context, /** * deletes a document by a query. */ - void deleteByQuery(String query) throws Exception; + void deleteByQuery(String query, IDeletionSubmittor submittor) throws Exception; /** * deletes a document by its uuid. diff --git a/core/src/main/java/org/fao/geonet/kernel/search/submission/DirectDeletionSubmittor.java b/core/src/main/java/org/fao/geonet/kernel/search/submission/DirectDeletionSubmittor.java index eb1c1546dc5..f99073b575e 100644 --- a/core/src/main/java/org/fao/geonet/kernel/search/submission/DirectDeletionSubmittor.java +++ b/core/src/main/java/org/fao/geonet/kernel/search/submission/DirectDeletionSubmittor.java @@ -2,6 +2,8 @@ import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest; +import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse; import org.fao.geonet.index.es.EsRestClient; import org.fao.geonet.kernel.search.EsSearchManager; @@ -15,7 +17,7 @@ public class DirectDeletionSubmittor implements IDeletionSubmittor { private DirectDeletionSubmittor() {} @Override - public void submitToIndex(String uuid, EsSearchManager searchManager) throws IOException { + public void submitUUIDToIndex(String uuid, EsSearchManager searchManager) throws IOException { EsRestClient restClient = searchManager.getClient(); List documents = Collections.singletonList(uuid); @@ -24,4 +26,14 @@ public void submitToIndex(String uuid, EsSearchManager searchManager) throws IOE searchManager.handleDeletionResponse(bulkItemResponses, documents); } + + @Override + public void submitQueryToIndex(String query, EsSearchManager searchManager) throws IOException { + EsRestClient restClient = searchManager.getClient(); + + DeleteByQueryRequest deleteByQueryRequest = restClient.buildDeleteByQuery(searchManager.getDefaultIndex(), query); + final DeleteByQueryResponse deleteByQueryResponse = restClient.getClient().deleteByQuery(deleteByQueryRequest); + + searchManager.handleDeletionResponse(deleteByQueryResponse, query); + } } diff --git a/core/src/main/java/org/fao/geonet/kernel/search/submission/IDeletionSubmittor.java b/core/src/main/java/org/fao/geonet/kernel/search/submission/IDeletionSubmittor.java index 30384dabff2..a5e744bacc4 100644 --- a/core/src/main/java/org/fao/geonet/kernel/search/submission/IDeletionSubmittor.java +++ b/core/src/main/java/org/fao/geonet/kernel/search/submission/IDeletionSubmittor.java @@ -6,5 +6,7 @@ public interface IDeletionSubmittor { - void submitToIndex(String uuid, EsSearchManager searchManager) throws IOException; + void submitUUIDToIndex(String uuid, EsSearchManager searchManager) throws IOException; + + void submitQueryToIndex(String query, EsSearchManager searchManager) throws IOException; } diff --git a/core/src/main/java/org/fao/geonet/kernel/search/submission/batch/BatchingDeletionSubmittor.java b/core/src/main/java/org/fao/geonet/kernel/search/submission/batch/BatchingDeletionSubmittor.java index 10387ef24f6..ffa97abad20 100644 --- a/core/src/main/java/org/fao/geonet/kernel/search/submission/batch/BatchingDeletionSubmittor.java +++ b/core/src/main/java/org/fao/geonet/kernel/search/submission/batch/BatchingDeletionSubmittor.java @@ -1,6 +1,8 @@ package org.fao.geonet.kernel.search.submission.batch; import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest; +import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse; import org.fao.geonet.index.es.EsRestClient; import org.fao.geonet.kernel.search.EsSearchManager; import org.fao.geonet.kernel.search.submission.IDeletionSubmittor; @@ -30,6 +32,15 @@ private void deleteDocumentsFromIndex(List toDelete) { .thenAcceptAsync(bulkItemResponses -> searchManager.handleDeletionResponse(bulkItemResponses, toDelete)); queueFuture(currentIndexFuture); } + + private void deleteDocumentByQuery(String query) { + EsRestClient restClient = searchManager.getClient(); + + DeleteByQueryRequest deleteByQueryRequest = restClient.buildDeleteByQuery(searchManager.getDefaultIndex(), query); + final CompletableFuture deleteByQueryFuture = restClient.getAsyncClient().deleteByQuery(deleteByQueryRequest) + .thenAcceptAsync(response -> searchManager.handleDeletionResponse(response, query)); + queueFuture(deleteByQueryFuture); + } } public BatchingDeletionSubmittor() { @@ -41,7 +52,7 @@ public BatchingDeletionSubmittor(int estimatedTotalSize) { } @Override - public void submitToIndex(String uuid, EsSearchManager searchManager) { + public void submitUUIDToIndex(String uuid, EsSearchManager searchManager) { if (state.closed) { throw new IllegalStateException("Attempting to use a closed " + this.getClass().getSimpleName()); } @@ -55,4 +66,14 @@ public void submitToIndex(String uuid, EsSearchManager searchManager) { state.deleteDocumentsFromIndex(toDelete); } } + + @Override + public void submitQueryToIndex(String query, EsSearchManager searchManager) { + if (state.closed) { + throw new IllegalStateException("Attempting to use a closed " + this.getClass().getSimpleName()); + } + + state.searchManager = searchManager; + state.deleteDocumentByQuery(query); + } } diff --git a/index/src/main/java/org/fao/geonet/index/es/EsRestClient.java b/index/src/main/java/org/fao/geonet/index/es/EsRestClient.java index 919deac8438..12266a5af55 100644 --- a/index/src/main/java/org/fao/geonet/index/es/EsRestClient.java +++ b/index/src/main/java/org/fao/geonet/index/es/EsRestClient.java @@ -250,27 +250,13 @@ public BulkRequest buildDeleteBulkRequest(String index, List deletionUUI return requestBuilder.build(); } - public void deleteByQuery(String index, String query) throws IOException { + public DeleteByQueryRequest buildDeleteByQuery(String index, String query) { checkActivated(); - DeleteByQueryRequest request = DeleteByQueryRequest.of( + return DeleteByQueryRequest.of( b -> b.index(index) .q(query) .refresh(true)); - - final DeleteByQueryResponse deleteByQueryResponse = - client.deleteByQuery(request); - - - if (!deleteByQueryResponse.failures().isEmpty()) { - StringBuilder stringBuilder = new StringBuilder(); - - deleteByQueryResponse.failures().forEach(f -> stringBuilder.append(f.toString())); - - throw new IOException(String.format( - "Error during removal. Errors are '%s'.", stringBuilder.toString() - )); - } } diff --git a/workers/wfsfeature-harvester/src/main/java/org/fao/geonet/harvester/wfsfeatures/worker/EsWFSFeatureIndexer.java b/workers/wfsfeature-harvester/src/main/java/org/fao/geonet/harvester/wfsfeatures/worker/EsWFSFeatureIndexer.java index dcc1b25cbd1..0d50c34dc44 100644 --- a/workers/wfsfeature-harvester/src/main/java/org/fao/geonet/harvester/wfsfeatures/worker/EsWFSFeatureIndexer.java +++ b/workers/wfsfeature-harvester/src/main/java/org/fao/geonet/harvester/wfsfeatures/worker/EsWFSFeatureIndexer.java @@ -26,10 +26,7 @@ import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester; import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener; import co.elastic.clients.elasticsearch._types.Result; -import co.elastic.clients.elasticsearch.core.BulkRequest; -import co.elastic.clients.elasticsearch.core.BulkResponse; -import co.elastic.clients.elasticsearch.core.IndexRequest; -import co.elastic.clients.elasticsearch.core.IndexResponse; +import co.elastic.clients.elasticsearch.core.*; import co.elastic.clients.util.BinaryData; import co.elastic.clients.util.ContentType; import com.fasterxml.jackson.core.JsonProcessingException; @@ -224,11 +221,11 @@ public void deleteFeatures(String url, String typeName, EsRestClient client) { new Object[]{url, typeName, index, indexType}); try { long begin = System.currentTimeMillis(); - client.deleteByQuery(index, String.format("+featureTypeId:\"%s\"", getIdentifier(url, typeName))); + deleteByQuery(String.format("+featureTypeId:\"%s\"", getIdentifier(url, typeName))); LOGGER.info(" Features deleted in {} ms.", System.currentTimeMillis() - begin); begin = System.currentTimeMillis(); - client.deleteByQuery(index, String.format("+id:\"%s\"", + deleteByQuery(String.format("+id:\"%s\"", getIdentifier(url, typeName))); LOGGER.info(" Report deleted in {} ms.", System.currentTimeMillis() - begin); @@ -238,6 +235,21 @@ public void deleteFeatures(String url, String typeName, EsRestClient client) { } } + private void deleteByQuery(String query) throws IOException { + DeleteByQueryRequest deleteByQueryRequest = client.buildDeleteByQuery(index, query); + DeleteByQueryResponse deleteByQueryResponse = client.getClient().deleteByQuery(deleteByQueryRequest); + if (!deleteByQueryResponse.failures().isEmpty()) { + StringBuilder stringBuilder = new StringBuilder(); + + deleteByQueryResponse.failures().forEach(f -> stringBuilder.append(f.toString())); + + throw new RuntimeException(String.format( + "Error during removal of query %s. Errors are '%s'.", query, stringBuilder.toString() + )); + } + + } + interface TitleResolver { void setTitle(ObjectNode objectNode, SimpleFeature simpleFeature); }