diff --git a/.github/workflows/build-docker.yml b/.github/workflows/build-docker.yml index b9285031..688ed136 100644 --- a/.github/workflows/build-docker.yml +++ b/.github/workflows/build-docker.yml @@ -5,7 +5,7 @@ concurrency: ${{ github.workflow }}-${{ github.ref }} on: push: branches: - - "*" + - "**" tags: - "*" pull_request: @@ -47,6 +47,11 @@ jobs: uses: docker/metadata-action@v4 with: images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=sha + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} - name: Build and also push Dockerimage id: build-and-push diff --git a/CHANGES.md b/CHANGES.md index 5e7f5e27..d3be4c8c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,7 @@ +1.19.1 (2024-11-05) +------------------- +- Added extra logging and try excepts to catch frames that bypass silently + 1.19.0 (2024-10-16) ------------------- - Added the ability to read fits files that are pre downloaded and are already in memory diff --git a/banzai/celery.py b/banzai/celery.py index c5647764..28637130 100644 --- a/banzai/celery.py +++ b/banzai/celery.py @@ -180,6 +180,7 @@ def process_image(file_info: dict, runtime_context: dict): :param file_info: Body of queue message: dict :param runtime_context: Context object with runtime environment info """ + logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')}) runtime_context = Context(runtime_context) try: if realtime_utils.need_to_process_image(file_info, runtime_context): diff --git a/banzai/main.py b/banzai/main.py index 34d1a827..bd1c1fd5 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -10,6 +10,7 @@ import argparse import os.path import logging +import traceback from kombu import Exchange, Connection, Queue from kombu.mixins import ConsumerMixin @@ -47,7 +48,13 @@ def get_consumers(self, Consumer, channel): return [consumer] def on_message(self, body, message): - instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address) + logger.info('Received message', extra_tags={'filename': body['filename']}) + try: + instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address) + except Exception: + logger.error(f'Could not get instrument from header. {traceback.format_exc()}', extra_tags={'filename': body['filename']}) + message.ack() + return if instrument is None or instrument.nx is None: queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME elif instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD: diff --git a/banzai/utils/realtime_utils.py b/banzai/utils/realtime_utils.py index 639f5146..2f6ca001 100644 --- a/banzai/utils/realtime_utils.py +++ b/banzai/utils/realtime_utils.py @@ -1,4 +1,5 @@ import os + from banzai import dbs from banzai.utils import file_utils, import_utils, image_utils from banzai.data import HeaderOnly @@ -48,6 +49,7 @@ def need_to_process_image(file_info, context): if 'frameid' in file_info: if 'version_set' not in file_info: + logger.info("Version set not available in file_info", extra_tags={"filename": file_info['filename']}) return True checksum = file_info['version_set'][0].get('md5') filename = file_info['filename'] @@ -57,7 +59,7 @@ def need_to_process_image(file_info, context): logger.info("Checking if file needs to be processed", extra_tags={"filename": filename}) if not (filename.endswith('.fits') or filename.endswith('.fits.fz')): - logger.debug("Filename does not have a .fits extension, stopping reduction", + logger.error("Filename does not have a .fits extension, stopping reduction", extra_tags={"filename": filename}) return False @@ -70,6 +72,7 @@ def need_to_process_image(file_info, context): # Check the md5. # Reset the number of tries if the file has changed on disk/in s3 if image.checksum != checksum: + logger.info('File has changed on disk. Resetting success flags and tries', extra_tags={'filename': filename}) need_to_process = True image.checksum = checksum image.tries = 0 @@ -78,6 +81,7 @@ def need_to_process_image(file_info, context): # Check if we need to try again elif image.tries < context.max_tries and not image.success: + logger.info('File has not been successfully processed yet. Trying again.', extra_tags={'filename': filename}) need_to_process = True dbs.commit_processed_image(image, context.db_address) @@ -88,7 +92,11 @@ def need_to_process_image(file_info, context): factory = import_utils.import_attribute(context.FRAME_FACTORY)() test_image = factory.observation_frame_class(hdu_list=[HeaderOnly(file_info, name='')], file_path=file_info['filename']) - test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address) + try: + test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address) + except Exception: + logger.error(f'Issue getting instrument from header. {logs.format_exception()}', extra_tags={'filename': filename}) + need_to_process = False if image_utils.get_reduction_level(test_image.meta) != '00': logger.error('Image has nonzero reduction level. Aborting.', extra_tags={'filename': filename}) need_to_process = False