diff --git a/geospaas_processing/tasks/syntool.py b/geospaas_processing/tasks/syntool.py index 38f93e26..29a85808 100644 --- a/geospaas_processing/tasks/syntool.py +++ b/geospaas_processing/tasks/syntool.py @@ -4,16 +4,18 @@ import shutil import subprocess import tempfile +from contextlib import ExitStack from pathlib import Path import celery import geospaas_processing.converters.syntool +import geospaas_processing.utils as utils from geospaas.catalog.models import Dataset from geospaas_processing.tasks import lock_dataset_files, FaultTolerantTask, WORKING_DIRECTORY from ..converters.syntool.converter import SyntoolConversionManager from ..models import ProcessingResult -from . import app +from . import app, DATASET_LOCK_PREFIX try: from . import core @@ -80,36 +82,35 @@ def convert(self, args, **kwargs): # pylint: disable=unused-argument def compare_profiles(self, args, **kwargs): """Generate side-by-side profiles of a model and in-situ data. """ - model_id = args[0] - profiles_lookups = kwargs['profiles_lookups'] - - model_dataset = Dataset.objects.get(id=model_id) - profiles = Dataset.objects.filter( - **profiles_lookups, - time_coverage_start__lte=model_dataset.time_coverage_end, - time_coverage_end__gte=model_dataset.time_coverage_start, - geographic_location__geometry__contained=model_dataset.geographic_location.geometry, - ) + model_id, (model_path,) = args[0] + profiles = args[1] # iterable of (id, (path,)) tuples - _, (model_path, ) = core.download((model_id,)) - profiles_paths = [] - for profile in profiles: - profiles_paths.append(core.download((profile.id,))[1][0]) working_dir = Path(WORKING_DIRECTORY) output_dir = Path(os.getenv('GEOSPAAS_PROCESSING_SYNTOOL_RESULTS_DIR', WORKING_DIRECTORY)) - with tempfile.TemporaryDirectory() as tmp_dir: + + locks = [utils.redis_lock(f"{DATASET_LOCK_PREFIX}{model_id}", self.request.id)] + for profile_id, _ in profiles: + locks.append(utils.redis_lock(f"{DATASET_LOCK_PREFIX}{profile_id}", self.request.id)) + + with tempfile.TemporaryDirectory() as tmp_dir, \ + ExitStack() as stack: + for lock in locks: # lock all model and profile datasets + stack.enter_context(lock) + command = [ 'python2', - str(Path(geospaas_processing.converters.syntool.__file__).parent / 'extra_readers' / 'compare_model_argo.py'), + str(Path(geospaas_processing.converters.syntool.__file__).parent + / 'extra_readers' + / 'compare_model_argo.py'), str(working_dir / model_path), - ','.join(str(working_dir / p) for p in profiles_paths), + ','.join(str(working_dir / p[1][0]) for p in profiles), tmp_dir ] try: process = subprocess.run(command, capture_output=True) except subprocess.CalledProcessError as error: logger.error("Could not generate comparison profiles for dataset %s\nstdout: %s\nstderr: %s", - model_dataset.entry_id, + model_id, process.stdout, process.stderr)