Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy logging part 1 #16893

Draft
wants to merge 7 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _init_worker_threads(self):
"""Start ``nworkers`` worker threads."""
self.work_queue = Queue()
self.work_threads = []
log.debug(f"Starting {self.nworkers} {self.runner_name} workers")
log.debug("Starting %s %s workers", self.nworkers, self.runner_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to open a giant can of worms, and I know this is only adhering to what we've put in the style guide, but... Do we really want to make this choice for performance? Is it really significant? The former is more naturally readable and I think I'd weigh that more heavily than what I assume are barely significant performance gains in debug log emission.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance is orders of magnitude better if log.debug messages are disabled (i.e. the log level is logging.INFO or higher). The logging framework defers formatting log message until it can't be avoided for this reason. It is also arguably safer if the string interpolation raises an exception, although that is less an issue with f-strings.

Whether it is worth it is another issue. I would advocate strongly for f-string in all other contexts but logging, then I think performance and safety outweigh the minor readability improvements. The process is automated so the cost of doing this is minimal. Having said that, if done project wide this does have the potential to modify a hundred or so files. So we (I) need to ensure the code being generated is correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion on this this summer (here's the link). I see Dannon's point; however, it's an accepted best practice, and I don't think readability is significantly affected by printf syntax.

Copy link
Member

@mvdbeek mvdbeek Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance is not a valid argument, logging is never a bottleneck for Galaxy. Speeding up fast code by 10X (or any factor, really) brings you nothing overall, and f-strings are certainly more readable. The addition to the styleguide was pretty contested, and the link you posted @jdavcs was the start of the discussion, see also #16312 (comment). I don't think we should touch existing code, and if we do then we should add the corresponding pylint rules and do it all in one batch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed on adding this to our style guide. IMHO, there's nothing wrong with editing existing code (especially as trivial as logging messages) to adhere to our style guide. If we disagree with the style guide, I think we should change the style guide first.
If we can add pylint rules for this - great; although doing all in one batch would require significantly more upfront work, which is why I think fixing one step at a time is fine too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And @dannon I should also add that those number are for messages that are logged, nothing is going to speed those up. If the string needs to be formatted the string needs to be formatted. Any performance gains would be from messages that are not logged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't close it. I think the choice we made for the styleguide is contentious and I personally would have gone the other way, but it's really okay and I don't want to make us relitigate that here -- I just wanted to put a little context to the scale of the performance argument. You can feel safe to add as many log statements as you want and it won't make a dent, with or without fstrings :)

Again, thanks for the effort. I'll just second what Marius mentioned before -- we should also include a linting rule whether pylint, ruff, whatever, to enforce this moving forward if we're going to change it across the board, and we need to make sure this is well tested and closely reviewed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ksuderman please ping me when it's ready for review (I assume while in draft state it's not there yet).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to point out that there is a lot of work going into fstrings, in particular but not limited to 3.12.

https://realpython.com/python312-f-strings/
https://peps.python.org/pep-0701/

Migrating away from fstring feels wrong, especially mid-term.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that if we do go ahead with this we need to be 100% confident the changes are correct. With this PR I am 99% confident I can prove the generated statements are correct, or if they are not correct that the original statement was not correct either. And I just had an idea on how to test each change to show that each one produces the same output as the original statement.

@jdavcs will do. This PR was just a proof of concept intended for discussion. I didn't think it would be such a hot topic.

@bgruening that should not be an issue. Any changes to f-strings will be handled by the Python parser. The latest version of my program simply rearranges nodes in the syntax tree for logging method calls. Only a handful of the logging method calls in the Galaxy code base do anything more that simple variable substitution. If the program does encounter a statement that does fancy formatting it prints a warning, skips that statement, and moves on. And this is just for logging methods, f-strings should be preferred for all other string formatting operations.

for i in range(self.nworkers):
worker = threading.Thread(name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next)
worker.daemon = True
Expand Down Expand Up @@ -201,11 +201,11 @@ def put(self, job_wrapper: "MinimalJobWrapper"):
# Required for exceptions thrown by object store incompatiblity.
# tested by test/integration/objectstore/test_private_handling.py
job_wrapper.fail(str(e), exception=e)
log.debug(f"Job [{job_wrapper.job_id}] failed to queue {put_timer}")
log.debug("Job [%s] failed to queue %s", job_wrapper.job_id, put_timer)
return
if queue_job:
self.mark_as_queued(job_wrapper)
log.debug(f"Job [{job_wrapper.job_id}] queued {put_timer}")
log.debug("Job [%s] queued %s", job_wrapper.job_id, put_timer)

def mark_as_queued(self, job_wrapper: "MinimalJobWrapper"):
self.work_queue.put((self.queue_job, job_wrapper))
Expand Down Expand Up @@ -278,12 +278,12 @@ def prepare_job(

# Make sure the job hasn't been deleted
if job_state == model.Job.states.DELETED:
log.debug(f"({job_id}) Job deleted by user before it entered the {self.runner_name} queue")
log.debug("(%s) Job deleted by user before it entered the %s queue", job_id, self.runner_name)
if self.app.config.cleanup_job in ("always", "onsuccess"):
job_wrapper.cleanup()
return False
elif job_state != model.Job.states.QUEUED:
log.info(f"({job_id}) Job is in state {job_state}, skipping execution")
log.info("(%s) Job is in state %s, skipping execution", job_id, job_state)
# cleanup may not be safe in all states
return False

Expand Down Expand Up @@ -475,7 +475,7 @@ def _handle_metadata_externally(self, job_wrapper: "MinimalJobWrapper", resolve_
env=os.environ,
preexec_fn=os.setpgrp,
)
log.debug("execution of external set_meta for job %d finished" % job_wrapper.job_id)
log.debug("execution of external set_meta for job %d finished", job_wrapper.job_id)

def get_job_file(self, job_wrapper: "MinimalJobWrapper", **kwds) -> str:
job_metrics = job_wrapper.app.job_metrics
Expand Down Expand Up @@ -504,7 +504,7 @@ def get_job_file(self, job_wrapper: "MinimalJobWrapper", **kwds) -> str:
# Additional logging to enable if debugging from_work_dir handling, metadata
# commands, etc... (or just peak in the job script.)
job_id = job_wrapper.job_id
log.debug(f"({job_id}) command is: {command_line}")
log.debug("(%s) command is: %s", job_id, command_line)
options.update(**kwds)
return job_script(**options)

Expand Down Expand Up @@ -713,7 +713,7 @@ def cleanup(self):
prefix = f"({self.job_wrapper.get_id_tag()})"
else:
prefix = f"({self.job_wrapper.get_id_tag()}/{self.job_id})"
log.debug(f"{prefix} Unable to cleanup {file}: {unicodify(e)}")
log.debug("%s Unable to cleanup %s: %s", prefix, file, unicodify(e))


class AsynchronousJobState(JobState):
Expand Down Expand Up @@ -840,7 +840,7 @@ def monitor_job(self, job_state):

def shutdown(self):
"""Attempts to gracefully shut down the monitor thread"""
log.info(f"{self.runner_name}: Sending stop signal to monitor thread")
log.info("%s: Sending stop signal to monitor thread", self.runner_name)
self.monitor_queue.put(STOP_SIGNAL)
# Call the parent's shutdown method to stop workers
self.shutdown_monitor()
Expand Down
10 changes: 5 additions & 5 deletions lib/galaxy/jobs/runners/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ def __init__(self, app, nworkers, **kwargs):
self._batch_client = session.client("batch")

def queue_job(self, job_wrapper):
log.debug(f"Starting queue_job for job {job_wrapper.get_id_tag()}")
log.debug("Starting queue_job for job %s", job_wrapper.get_id_tag())
if not self.prepare_job(job_wrapper, include_metadata=False, modify_command_for_container=False):
log.debug(f"Not ready {job_wrapper.get_id_tag()}")
log.debug("Not ready %s", job_wrapper.get_id_tag())
return

job_destination = job_wrapper.job_destination
Expand Down Expand Up @@ -252,7 +252,7 @@ def _get_job_definition(self, job_wrapper, destination_params):
jd_arn = self._register_job_definition(jd_name, container_image, destination_params)
else:
jd_arn = res["jobDefinitions"][0]["jobDefinitionArn"]
log.debug(f"Found existing job definition: {jd_name}.")
log.debug("Found existing job definition: %s.", jd_name)

return jd_arn

Expand Down Expand Up @@ -325,7 +325,7 @@ def _get_retry_strategy(self, destination_params):
return strategy

def _register_job_definition(self, jd_name, container_image, destination_params):
log.debug(f"Registering a new job definition: {jd_name}.")
log.debug("Registering a new job definition: %s.", jd_name)
platform = destination_params.get("platform")
volumes, mount_points = self._get_mount_volumes(destination_params)

Expand Down Expand Up @@ -375,7 +375,7 @@ def _submit_job(self, job_def, job_wrapper, destination_params):
job_name = self.JOB_NAME_PREFIX + job_wrapper.get_id_tag()
command_script_path = self.write_command(job_wrapper)

log.info(f"Submitting job {job_name} to AWS Batch.")
log.info("Submitting job %s to AWS Batch.", job_name)
res = self._batch_client.submit_job(
jobName=job_name,
jobQueue=destination_params.get("job_queue"),
Expand Down
38 changes: 25 additions & 13 deletions lib/galaxy/jobs/runners/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def url_to_destination(self, url):
job_params = {f"job_{k}": v for k, v in [kv.split("=", 1) for kv in job_params.split("&")]}
params.update(shell_params)
params.update(job_params)
log.debug(f"Converted URL '{url}' to destination runner=cli, params={params}")
log.debug("Converted URL '%s' to destination runner=cli, params=%s", url, params)
# Create a dynamic JobDestination
return JobDestination(runner="cli", params=params)

Expand Down Expand Up @@ -96,7 +96,7 @@ def queue_job(self, job_wrapper):
job_wrapper.cleanup()
return

log.debug(f"({galaxy_id_tag}) submitting file: {ajs.job_file}")
log.debug("(%s) submitting file: %s", galaxy_id_tag, ajs.job_file)

returncode, stdout = self.submit(shell, job_interface, ajs.job_file, galaxy_id_tag, retry=MAX_SUBMIT_RETRY)
if returncode != 0:
Expand All @@ -107,11 +107,11 @@ def queue_job(self, job_wrapper):
submit_stdout = stdout.strip()
external_job_id = submit_stdout and submit_stdout.split()[-1]
if not external_job_id:
log.error(f"({galaxy_id_tag}) submission did not return a job identifier, failing job")
log.error("(%s) submission did not return a job identifier, failing job", galaxy_id_tag)
job_wrapper.fail("failure submitting job")
return

log.info(f"({galaxy_id_tag}) queued with identifier: {external_job_id}")
log.info("(%s) queued with identifier: %s", galaxy_id_tag, external_job_id)

# store runner information for tracking if Galaxy restarts
job_wrapper.set_external_id(external_job_id)
Expand All @@ -137,7 +137,9 @@ def submit(self, shell, job_interface, job_file, galaxy_id_tag, retry=MAX_SUBMIT
stdout = cmd_out.stdout
if not stdout or not stdout.strip():
log.warning(
f"({galaxy_id_tag}) Execute returned a 0 exit code but no external identifier will be recovered from empty stdout - stderr is {cmd_out.stderr}"
"(%s) Execute returned a 0 exit code but no external identifier will be recovered from empty stdout - stderr is %s",
galaxy_id_tag,
cmd_out.stderr,
)
return returncode, stdout
stdout = f"({galaxy_id_tag}) submission failed (stdout): {cmd_out.stdout}"
Expand Down Expand Up @@ -170,18 +172,20 @@ def check_watched_items(self):
if ajs.job_wrapper.get_state() == model.Job.states.DELETED:
continue

log.debug(f"({id_tag}/{external_job_id}) job not found in batch state check")
log.debug("(%s/%s) job not found in batch state check", id_tag, external_job_id)
shell_params, job_params = self.parse_destination_params(ajs.job_destination.params)
shell, job_interface = self.get_cli_plugins(shell_params, job_params)
cmd_out = shell.execute(job_interface.get_single_status(external_job_id))
state = job_interface.parse_single_status(cmd_out.stdout, external_job_id)
if not state == model.Job.states.OK:
log.warning(
f"({id_tag}/{external_job_id}) job not found in batch state check, but found in individual state check"
"(%s/%s) job not found in batch state check, but found in individual state check",
id_tag,
external_job_id,
)
job_state = ajs.job_wrapper.get_state()
if state != old_state:
log.debug(f"({id_tag}/{external_job_id}) state change: from {old_state} to {state}")
log.debug("(%s/%s) state change: from %s to %s", id_tag, external_job_id, old_state, state)
if state == model.Job.states.ERROR and job_state != model.Job.states.STOPPED:
# Try to find out the reason for exiting - this needs to happen before change_state
# otherwise jobs depending on resubmission outputs see that job as failed and pause.
Expand All @@ -201,7 +205,7 @@ def check_watched_items(self):
)
if external_metadata:
self.work_queue.put((self.handle_metadata_externally, ajs))
log.debug(f"({id_tag}/{external_job_id}) job execution finished, running job wrapper finish method")
log.debug("(%s/%s) job execution finished, running job wrapper finish method", id_tag, external_job_id)
self.work_queue.put((self.finish_job, ajs))
else:
new_watched.append(ajs)
Expand Down Expand Up @@ -253,10 +257,13 @@ def stop_job(self, job_wrapper):
shell, job_interface = self.get_cli_plugins(shell_params, job_params)
cmd_out = shell.execute(job_interface.delete(job.job_runner_external_id))
assert cmd_out.returncode == 0, cmd_out.stderr
log.debug(f"({job.id}/{job.job_runner_external_id}) Terminated at user's request")
log.debug("(%s/%s) Terminated at user's request", job.id, job.job_runner_external_id)
except Exception as e:
log.debug(
f"({job.id}/{job.job_runner_external_id}) User killed running job, but error encountered during termination: {e}"
"(%s/%s) User killed running job, but error encountered during termination: %s",
job.id,
job.job_runner_external_id,
e,
)

def recover(self, job, job_wrapper):
Expand All @@ -274,14 +281,19 @@ def recover(self, job, job_wrapper):
ajs.command_line = job.command_line
if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED):
log.debug(
f"({job.id}/{job.job_runner_external_id}) is still in {job.state} state, adding to the runner monitor queue"
"(%s/%s) is still in %s state, adding to the runner monitor queue",
job.id,
job.job_runner_external_id,
job.state,
)
ajs.old_state = model.Job.states.RUNNING
ajs.running = True
self.monitor_queue.put(ajs)
elif job.state == model.Job.states.QUEUED:
log.debug(
f"({job.id}/{job.job_runner_external_id}) is still in queued state, adding to the runner monitor queue"
"(%s/%s) is still in queued state, adding to the runner monitor queue",
job.id,
job.job_runner_external_id,
)
ajs.old_state = model.Job.states.QUEUED
ajs.running = False
Expand Down
37 changes: 21 additions & 16 deletions lib/galaxy/jobs/runners/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ def queue_job(self, job_wrapper):
job_wrapper.cleanup()
return

log.debug(f"({galaxy_id_tag}) submitting file {executable}")
log.debug("(%s) submitting file %s", galaxy_id_tag, executable)

external_job_id, message = condor_submit(submit_file)
if external_job_id is None:
log.debug(f"condor_submit failed for job {job_wrapper.get_id_tag()}: {message}")
log.debug("condor_submit failed for job %s: %s", job_wrapper.get_id_tag(), message)
if self.app.config.cleanup_job == "always":
os.unlink(submit_file)
cjs.cleanup()
Expand All @@ -145,7 +145,7 @@ def queue_job(self, job_wrapper):

os.unlink(submit_file)

log.info(f"({galaxy_id_tag}) queued as {external_job_id}")
log.info("(%s) queued as %s", galaxy_id_tag, external_job_id)

# store runner information for tracking if Galaxy restarts
job_wrapper.set_external_id(external_job_id)
Expand Down Expand Up @@ -181,7 +181,7 @@ def check_watched_items(self):
except Exception:
# so we don't kill the monitor thread
log.exception(f"({galaxy_id_tag}/{job_id}) Unable to check job status")
log.warning(f"({galaxy_id_tag}/{job_id}) job will now be errored")
log.warning("(%s/%s) job will now be errored", galaxy_id_tag, job_id)
cjs.fail_message = "Cluster could not complete job"
self.work_queue.put((self.fail_job, cjs))
continue
Expand All @@ -191,10 +191,10 @@ def check_watched_items(self):
cjs.job_wrapper.check_for_entry_points()

if job_running and not cjs.running:
log.debug(f"({galaxy_id_tag}/{job_id}) job is now running")
log.debug("(%s/%s) job is now running", galaxy_id_tag, job_id)
cjs.job_wrapper.change_state(model.Job.states.RUNNING)
if not job_running and cjs.running:
log.debug(f"({galaxy_id_tag}/{job_id}) job has stopped running")
log.debug("(%s/%s) job has stopped running", galaxy_id_tag, job_id)
# Will switching from RUNNING to QUEUED confuse Galaxy?
# cjs.job_wrapper.change_state( model.Job.states.QUEUED )
job_state = cjs.job_wrapper.get_state()
Expand All @@ -205,11 +205,11 @@ def check_watched_items(self):
)
if external_metadata:
self._handle_metadata_externally(cjs.job_wrapper, resolve_requirements=True)
log.debug(f"({galaxy_id_tag}/{job_id}) job has completed")
log.debug("(%s/%s) job has completed", galaxy_id_tag, job_id)
self.work_queue.put((self.finish_job, cjs))
continue
if job_failed:
log.debug(f"({galaxy_id_tag}/{job_id}) job failed")
log.debug("(%s/%s) job failed", galaxy_id_tag, job_id)
cjs.failed = True
self.work_queue.put((self.fail_job, cjs))
continue
Expand All @@ -225,7 +225,7 @@ def stop_job(self, job_wrapper):
galaxy_id_tag = job_wrapper.get_id_tag()
if job.container:
try:
log.info(f"stop_job(): {job.id}: trying to stop container .... ({external_id})")
log.info("stop_job(): %s: trying to stop container .... (%s)", job.id, external_id)
# self.watched = [cjs for cjs in self.watched if cjs.job_id != external_id]
new_watch_list = list()
cjs = None
Expand All @@ -244,21 +244,21 @@ def stop_job(self, job_wrapper):
)
if external_metadata:
self._handle_metadata_externally(cjs.job_wrapper, resolve_requirements=True)
log.debug(f"({galaxy_id_tag}/{external_id}) job has completed")
log.debug("(%s/%s) job has completed", galaxy_id_tag, external_id)
self.work_queue.put((self.finish_job, cjs))
except Exception as e:
log.warning(f"stop_job(): {job.id}: trying to stop container failed. ({e})")
log.warning("stop_job(): %s: trying to stop container failed. (%s)", job.id, e)
try:
self._kill_container(job_wrapper)
except Exception as e:
log.warning(f"stop_job(): {job.id}: trying to kill container failed. ({e})")
log.warning("stop_job(): %s: trying to kill container failed. (%s)", job.id, e)
failure_message = condor_stop(external_id)
if failure_message:
log.debug(f"({external_id}). Failed to stop condor {failure_message}")
log.debug("(%s). Failed to stop condor %s", external_id, failure_message)
else:
failure_message = condor_stop(external_id)
if failure_message:
log.debug(f"({external_id}). Failed to stop condor {failure_message}")
log.debug("(%s). Failed to stop condor %s", external_id, failure_message)

def recover(self, job, job_wrapper):
"""Recovers jobs stuck in the queued/running state when Galaxy started"""
Expand All @@ -277,12 +277,17 @@ def recover(self, job, job_wrapper):
cjs.register_cleanup_file_attribute("user_log")
if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED):
log.debug(
f"({job.id}/{job.get_job_runner_external_id()}) is still in {job.state} state, adding to the DRM queue"
"(%s/%s) is still in %s state, adding to the DRM queue",
job.id,
job.get_job_runner_external_id(),
job.state,
)
cjs.running = True
self.monitor_queue.put(cjs)
elif job.state == model.Job.states.QUEUED:
log.debug(f"({job.id}/{job.job_runner_external_id}) is still in DRM queued state, adding to the DRM queue")
log.debug(
"(%s/%s) is still in DRM queued state, adding to the DRM queue", job.id, job.job_runner_external_id
)
cjs.running = False
self.monitor_queue.put(cjs)

Expand Down
Loading
Loading