Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added code to honor --log-cli-level (issue #402) #883

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
10 changes: 9 additions & 1 deletion src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
LoadGroupScheduling,
WorkStealingScheduling,
)

import logging
import pickle

from queue import Empty, Queue

Expand Down Expand Up @@ -286,6 +287,13 @@ def worker_testreport(self, node, rep):
self.config.hook.pytest_runtest_logreport(report=rep)
self._handlefailures(rep)

def worker_runtest_logmessage(self, node, record):
record = pickle.loads(record)
for handler in logging.getLogger().handlers:
process = record.levelno >= handler.level
if process:
handler.handle(record)

def worker_runtest_protocol_complete(self, node, item_index, duration):
"""
Emitted when a node fires the 'runtest_protocol_complete' event,
Expand Down
56 changes: 56 additions & 0 deletions src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import os
import time
from typing import Any
import pickle
import logging
import copy

import pytest
from execnet.gateway_base import dumps, DumpError
Expand Down Expand Up @@ -56,6 +59,48 @@ def worker_title(title):
pass


class RemoteMessageHandler(logging.Handler):
"""
This handler sends events to a queue. Typically, it would be used together
with a multiprocessing Queue to centralise logging to file in one process
(in a multi-process application), so as to avoid file write contention
between processes.

This code is new in Python 3.2, but this class can be copy pasted into
kurt-cb marked this conversation as resolved.
Show resolved Hide resolved
user code for use with earlier Python versions.

Largely based on QueueHandler in handlers.py in cpython :
Source: https://github.com/python/cpython/blob/8f324b7ecd2df3036fab098c4c8ac185ac07b277/Lib/logging/handlers.py#L1412
"""

def __init__(self, queue):
"""
Initialise an instance, using the passed queue.
"""
logging.Handler.__init__(self)
self.queue = queue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confusing calling this queue when it is actually a WorkerInteractor instance, perhaps use worker_interactor? Also worth updating the docs.


def emit(self, record):
"""
Emit a record.

Writes the LogRecord to the queue, preparing it for pickling first.
"""
try:
msg = self.format(record)
# bpo-35726: make copy of record to avoid affecting other handlers in the chain.
record = copy.copy(record)
record.message = msg
record.msg = msg
record.args = None
record.exc_info = None
record.exc_text = None
x = pickle.dumps(record)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably does not matter much, but we might force protocol version 4 here to ensure compatibility between different python interpreters.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x = pickle.dumps(record, protocol=4)
or
x = pickle.dumps(record, protocol=pickle.DEFAULT_PROTOCOL)
?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are log messages pickle safe?
im "pretty" sure there some hazard when dealing with custom objects

self.queue.send_log(x)
except Exception:
self.handleError(record)


class WorkerInteractor:
SHUTDOWN_MARK = object()
QUEUE_REPLACED_MARK = object()
Expand All @@ -70,6 +115,14 @@ def __init__(self, config, channel):
self.nextitem_index = None
config.pluginmanager.register(self)

# pump cli messages back to master if a level is set
if config.option.log_cli_level:
rootlog = logging.getLogger()
myhandler = RemoteMessageHandler(self)
rootlog.addHandler(myhandler)
level = logging.getLevelName(config.option.log_cli_level)
myhandler.setLevel(level)

def _make_queue(self):
return self.channel.gateway.execmodel.queue.Queue()

Expand All @@ -86,6 +139,9 @@ def sendevent(self, name, **kwargs):
self.log("sending", name, kwargs)
self.channel.send((name, kwargs))

def send_log(self, pickled_record):
self.sendevent("runtest_logmessage", pickled_record=pickled_record)

@pytest.hookimpl
def pytest_internalerror(self, excrepr):
formatted_error = str(excrepr)
Expand Down
2 changes: 2 additions & 0 deletions src/xdist/workermanage.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ def process_from_remote(self, eventcall): # noqa too complex
self.notify_inproc(eventname, node=self, ids=kwargs["ids"])
elif eventname == "runtest_protocol_complete":
self.notify_inproc(eventname, node=self, **kwargs)
elif eventname == "runtest_logmessage":
self.notify_inproc(eventname, node=self, **kwargs)
elif eventname == "unscheduled":
self.notify_inproc(eventname, node=self, **kwargs)
elif eventname == "logwarning":
Expand Down