Skip to content

Commit

Permalink
remove downloads from compare_profiles task
Browse files Browse the repository at this point in the history
  • Loading branch information
aperrin66 committed Sep 17, 2024
1 parent 0388340 commit e4abb6d
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions geospaas_processing/tasks/syntool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
import shutil
import subprocess
import tempfile
from contextlib import ExitStack
from pathlib import Path

import celery

import geospaas_processing.converters.syntool
import geospaas_processing.utils as utils
from geospaas.catalog.models import Dataset
from geospaas_processing.tasks import lock_dataset_files, FaultTolerantTask, WORKING_DIRECTORY
from ..converters.syntool.converter import SyntoolConversionManager
from ..models import ProcessingResult
from . import app
from . import app, DATASET_LOCK_PREFIX

try:
from . import core
Expand Down Expand Up @@ -80,36 +82,35 @@ def convert(self, args, **kwargs): # pylint: disable=unused-argument
def compare_profiles(self, args, **kwargs):
"""Generate side-by-side profiles of a model and in-situ data.
"""
model_id = args[0]
profiles_lookups = kwargs['profiles_lookups']

model_dataset = Dataset.objects.get(id=model_id)
profiles = Dataset.objects.filter(
**profiles_lookups,
time_coverage_start__lte=model_dataset.time_coverage_end,
time_coverage_end__gte=model_dataset.time_coverage_start,
geographic_location__geometry__contained=model_dataset.geographic_location.geometry,
)
model_id, (model_path,) = args[0]
profiles = args[1] # iterable of (id, (path,)) tuples

_, (model_path, ) = core.download((model_id,))
profiles_paths = []
for profile in profiles:
profiles_paths.append(core.download((profile.id,))[1][0])
working_dir = Path(WORKING_DIRECTORY)
output_dir = Path(os.getenv('GEOSPAAS_PROCESSING_SYNTOOL_RESULTS_DIR', WORKING_DIRECTORY))
with tempfile.TemporaryDirectory() as tmp_dir:

locks = [utils.redis_lock(f"{DATASET_LOCK_PREFIX}{model_id}", self.request.id)]
for profile_id, _ in profiles:
locks.append(utils.redis_lock(f"{DATASET_LOCK_PREFIX}{profile_id}", self.request.id))

with tempfile.TemporaryDirectory() as tmp_dir, \
ExitStack() as stack:
for lock in locks: # lock all model and profile datasets
stack.enter_context(lock)

command = [
'python2',
str(Path(geospaas_processing.converters.syntool.__file__).parent / 'extra_readers' / 'compare_model_argo.py'),
str(Path(geospaas_processing.converters.syntool.__file__).parent
/ 'extra_readers'
/ 'compare_model_argo.py'),
str(working_dir / model_path),
','.join(str(working_dir / p) for p in profiles_paths),
','.join(str(working_dir / p[1][0]) for p in profiles),
tmp_dir
]
try:
process = subprocess.run(command, capture_output=True)
except subprocess.CalledProcessError as error:
logger.error("Could not generate comparison profiles for dataset %s\nstdout: %s\nstderr: %s",
model_dataset.entry_id,
model_id,
process.stdout,
process.stderr)

Expand Down

0 comments on commit e4abb6d

Please sign in to comment.