Skip to content

Commit

Permalink
Refactor to reduce cognitive complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrunner committed Dec 1, 2024
1 parent a71a0e0 commit 2a098bb
Showing 1 changed file with 114 additions and 89 deletions.
203 changes: 114 additions & 89 deletions tilecloud_chain/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from urllib.parse import urlencode, urljoin

import botocore.exceptions
import PIL.ImageFile
import requests
import ruamel.yaml
import tilecloud.store.redis
Expand Down Expand Up @@ -360,6 +361,112 @@ def _fill_legend(
previous_resolution = resolution


def _get_legend_image(
layer_name: str,
wms_layer: str,
resolution: float,
url: str,
session: requests.Session,
main_mime_type: str,
out: IO[str] | None = None,
) -> PIL.ImageFile.ImageFile | None:
_LOGGER.debug(
"Get legend image for layer '%s'-'%s', resolution '%s': %s",
layer_name,
wms_layer,
resolution,
url,
)
try:
response = session.get(url)
except Exception as e: # pylint: disable=broad-exception-caught
if out is not None:
print(
"\n".join(
[
f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'",
url,
str(e),
]
),
file=out,
)
_LOGGER.debug(
"Unable to get legend image for layer '%s'-'%s', resolution '%s'",
layer_name,
wms_layer,
resolution,
exc_info=True,
)
return None
if response.status_code != 200:
if out is not None:
print(
"\n".join(
[
f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'",
url,
f"status code: {response.status_code}: {response.reason}",
response.text,
]
),
file=out,
)
_LOGGER.debug(
"Unable to get legend image for layer '%s'-'%s', resolution '%s': %s",
layer_name,
wms_layer,
resolution,
response.text,
)
return None
if not response.headers["Content-Type"].startswith(main_mime_type):
if out is not None:
print(
"\n".join(
[
f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'",
url,
f"Content-Type: {response.headers['Content-Type']}",
response.text,
]
),
file=out,
)
_LOGGER.debug(
"Unable to get legend image for layer '%s'-'%s', resolution '%s', content-type: %s: %s",
layer_name,
wms_layer,
resolution,
response.headers["Content-Type"],
response.text,
)
return None
try:
return Image.open(BytesIO(response.content))
except Exception: # pylint: disable=broad-exception-caught
if out is not None:
print(
"\n".join(
[
f"Unable to read legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'",
url,
response.text,
]
),
file=out,
)
_LOGGER.debug(
"Unable to read legend image for layer '%s'-'%s', resolution '%s': %s",
layer_name,
wms_layer,
resolution,
response.text,
exc_info=True,
)
return None


def _generate_legend_images(gene: TileGeneration, out: IO[str] | None = None) -> None:
assert gene.config_file
config = gene.get_config(gene.config_file)
Expand Down Expand Up @@ -389,100 +496,18 @@ def _generate_legend_images(gene: TileGeneration, out: IO[str] | None = None) ->
}
)
)
_LOGGER.debug(
"Get legend image for layer '%s'-'%s', resolution '%s': %s",
legend_image = _get_legend_image(
layer_name,
wms_layer,
resolution,
url,
session,
layer["legend_mime"].split("/")[0],
out,
)
try:
response = session.get(url)
except Exception as e: # pylint: disable=broad-exception-caught
if out is not None:
print(
"\n".join(
[
f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'",
url,
str(e),
]
),
file=out,
)
_LOGGER.debug(
"Unable to get legend image for layer '%s'-'%s', resolution '%s'",
layer_name,
wms_layer,
resolution,
exc_info=True,
)
continue
if response.status_code != 200:
if out is not None:
print(
"\n".join(
[
f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'",
url,
f"status code: {response.status_code}: {response.reason}",
response.text,
]
),
file=out,
)
_LOGGER.debug(
"Unable to get legend image for layer '%s'-'%s', resolution '%s': %s",
layer_name,
wms_layer,
resolution,
response.text,
)
continue
if not response.headers["Content-Type"].startswith(layer["legend_mime"].split("/")[0]):
if out is not None:
print(
"\n".join(
[
f"Unable to get legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'",
url,
f"Content-Type: {response.headers['Content-Type']}",
response.text,
]
),
file=out,
)
_LOGGER.debug(
"Unable to get legend image for layer '%s'-'%s', resolution '%s', content-type: %s: %s",
layer_name,
wms_layer,
resolution,
response.headers["Content-Type"],
response.text,
)
continue
try:
legends.append(Image.open(BytesIO(response.content)))
except Exception: # pylint: disable=broad-exception-caught
if out is not None:
print(
"\n".join(
[
f"Unable to read legend image for layer '{layer_name}'-'{wms_layer}', resolution '{resolution}'",
url,
response.text,
]
),
file=out,
)
_LOGGER.debug(
"Unable to read legend image for layer '%s'-'%s', resolution '%s': %s",
layer_name,
wms_layer,
resolution,
response.text,
exc_info=True,
)
if legend_image is not None:
legends.append(legend_image)

width = max(1, max(i.size[0] for i in legends) if legends else 0)
height = max(1, sum(i.size[1] for i in legends) if legends else 0)
image = Image.new("RGBA", (width, height))
Expand Down

0 comments on commit 2a098bb

Please sign in to comment.