From 2a098bba3f1b742df55c57aedd3962fea0a4e68a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Brunner?= Date: Sun, 1 Dec 2024 21:32:04 +0100 Subject: [PATCH] Refactor to reduce cognitive complexity --- tilecloud_chain/controller.py | 203 +++++++++++++++++++--------------- 1 file changed, 114 insertions(+), 89 deletions(-) diff --git a/tilecloud_chain/controller.py b/tilecloud_chain/controller.py index 8c22f86cf..d354658e5 100644 --- a/tilecloud_chain/controller.py +++ b/tilecloud_chain/controller.py @@ -16,6 +16,7 @@ from urllib.parse import urlencode, urljoin import botocore.exceptions +import PIL.ImageFile import requests import ruamel.yaml import tilecloud.store.redis @@ -360,6 +361,112 @@ def _fill_legend( previous_resolution = resolution +def _get_legend_image( + layer_name: str, + wms_layer: str, + resolution: float, + url: str, + session: requests.Session, + main_mime_type: str, + out: IO[str] | None = None, +) -> PIL.ImageFile.ImageFile | None: + _LOGGER.debug( + "Get legend image for layer '%s'-'%s', resolution '%s': %s", + layer_name, + wms_layer, + resolution, + url, + ) + try: + response = session.get(url) + except Exception as e: # pylint: disable=broad-exception-caught + if out is not None: + print( + "\n".join( + [ + f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'", + url, + str(e), + ] + ), + file=out, + ) + _LOGGER.debug( + "Unable to get legend image for layer '%s'-'%s', resolution '%s'", + layer_name, + wms_layer, + resolution, + exc_info=True, + ) + return None + if response.status_code != 200: + if out is not None: + print( + "\n".join( + [ + f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'", + url, + f"status code: {response.status_code}: {response.reason}", + response.text, + ] + ), + file=out, + ) + _LOGGER.debug( + "Unable to get legend image for layer '%s'-'%s', resolution '%s': %s", + layer_name, + wms_layer, + resolution, + response.text, + ) + return None + if not response.headers["Content-Type"].startswith(main_mime_type): + if out is not None: + print( + "\n".join( + [ + f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'", + url, + f"Content-Type: {response.headers['Content-Type']}", + response.text, + ] + ), + file=out, + ) + _LOGGER.debug( + "Unable to get legend image for layer '%s'-'%s', resolution '%s', content-type: %s: %s", + layer_name, + wms_layer, + resolution, + response.headers["Content-Type"], + response.text, + ) + return None + try: + return Image.open(BytesIO(response.content)) + except Exception: # pylint: disable=broad-exception-caught + if out is not None: + print( + "\n".join( + [ + f"Unable to read legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'", + url, + response.text, + ] + ), + file=out, + ) + _LOGGER.debug( + "Unable to read legend image for layer '%s'-'%s', resolution '%s': %s", + layer_name, + wms_layer, + resolution, + response.text, + exc_info=True, + ) + return None + + def _generate_legend_images(gene: TileGeneration, out: IO[str] | None = None) -> None: assert gene.config_file config = gene.get_config(gene.config_file) @@ -389,100 +496,18 @@ def _generate_legend_images(gene: TileGeneration, out: IO[str] | None = None) -> } ) ) - _LOGGER.debug( - "Get legend image for layer '%s'-'%s', resolution '%s': %s", + legend_image = _get_legend_image( layer_name, wms_layer, resolution, url, + session, + layer["legend_mime"].split("/")[0], + out, ) - try: - response = session.get(url) - except Exception as e: # pylint: disable=broad-exception-caught - if out is not None: - print( - "\n".join( - [ - f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'", - url, - str(e), - ] - ), - file=out, - ) - _LOGGER.debug( - "Unable to get legend image for layer '%s'-'%s', resolution '%s'", - layer_name, - wms_layer, - resolution, - exc_info=True, - ) - continue - if response.status_code != 200: - if out is not None: - print( - "\n".join( - [ - f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'", - url, - f"status code: {response.status_code}: {response.reason}", - response.text, - ] - ), - file=out, - ) - _LOGGER.debug( - "Unable to get legend image for layer '%s'-'%s', resolution '%s': %s", - layer_name, - wms_layer, - resolution, - response.text, - ) - continue - if not response.headers["Content-Type"].startswith(layer["legend_mime"].split("/")[0]): - if out is not None: - print( - "\n".join( - [ - f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'", - url, - f"Content-Type: {response.headers['Content-Type']}", - response.text, - ] - ), - file=out, - ) - _LOGGER.debug( - "Unable to get legend image for layer '%s'-'%s', resolution '%s', content-type: %s: %s", - layer_name, - wms_layer, - resolution, - response.headers["Content-Type"], - response.text, - ) - continue - try: - legends.append(Image.open(BytesIO(response.content))) - except Exception: # pylint: disable=broad-exception-caught - if out is not None: - print( - "\n".join( - [ - f"Unable to read legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'", - url, - response.text, - ] - ), - file=out, - ) - _LOGGER.debug( - "Unable to read legend image for layer '%s'-'%s', resolution '%s': %s", - layer_name, - wms_layer, - resolution, - response.text, - exc_info=True, - ) + if legend_image is not None: + legends.append(legend_image) + width = max(1, max(i.size[0] for i in legends) if legends else 0) height = max(1, sum(i.size[1] for i in legends) if legends else 0) image = Image.new("RGBA", (width, height))