Skip to content

Commit

Permalink
Merge pull request #129 from NaturalHistoryMuseum/josh/datastore_get_…
Browse files Browse the repository at this point in the history
…resource_versions_perf

Increase performance in datastore_get_resource_versions action
  • Loading branch information
jrdh authored Oct 10, 2023
2 parents a69cdbd + d66731b commit 1a06240
Showing 1 changed file with 29 additions and 13 deletions.
42 changes: 29 additions & 13 deletions ckanext/versioned_datastore/logic/actions/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from ckan.plugins import toolkit
from splitgill.search import create_version_query
from splitgill.utils import to_timestamp
from elasticsearch_dsl import Search
from splitgill.utils import to_timestamp, chunk_iterator
from elasticsearch_dsl import Search, MultiSearch

from .meta import help, schema
from ckantools.decorators import action
Expand Down Expand Up @@ -46,28 +46,44 @@ def datastore_get_resource_versions(
"""
Retrieves all the versions of the given resource when under the given search. Note
that the schema used for this action is the same as the datastore_search schema. The
return is a dict including the version timestamp, the number of records modified in
the version and the total records at the version.
return is a list of dicts each of which includes the version timestamp, the number
of records modified in the version and the total records at the version.
:param resource_id: the id of the resource to examine
:param context: the context dict from the action call
:param data_dict: the data_dict from the action call
:param original_data_dict: the data_dict before it was validated
:return:
:return: a list of dicts
"""
original_data_dict, data_dict, version, search = create_search(
context, data_dict, original_data_dict
)
index_name = prefix_resource(resource_id)

data = common.SEARCH_HELPER.get_index_version_counts(index_name, search=search)

search = search.using(common.ES_CLIENT).index(index_name)[0:0]
for result in data:
version = result['version']
count = search.filter(create_version_query(version)).count()
result['count'] = count
return data
# this gives us every version in the index, plus the number of changes in that
# version (changes include new records and changed records)
counts = common.SEARCH_HELPER.get_index_version_counts(index_name, search=search)

# each of the dicts in the counts list above contains the version and the number of
# changes, but not the number of records in that version, we want to add this to
# the dicts. To do this we will run msearches against elasticsearch so that we can
# batch up the searches and get better performance (there could be 1000s of
# versions to count after all). This variable simply defines how many searches to do
# in each msearch batch
multisearch_chunk_size = 100

for details_chunk in chunk_iterator(counts, multisearch_chunk_size):
multisearch = MultiSearch(using=common.ES_CLIENT, index=index_name)
for details in details_chunk:
multisearch = multisearch.add(
Search()[0:0].filter(create_version_query(details["version"]))
)
results = multisearch.execute()
# update the count details we got from splitgill with the actual record count
for detail, result in zip(details_chunk, results):
detail["count"] = result.hits.total

return counts


@action(
Expand Down

0 comments on commit 1a06240

Please sign in to comment.