Skip to content

Commit

Permalink
Fix log event with multiple topics (#8691)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Jun 12, 2024
1 parent 2482bd3 commit af237f0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
25 changes: 23 additions & 2 deletions distributed/diagnostics/tests/test_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,8 @@ async def start(self, scheduler: Scheduler) -> None:
self.scheduler = scheduler
self.scheduler._recorded_events = list() # type: ignore

def log_event(self, name, msg):
self.scheduler._recorded_events.append((name, msg))
def log_event(self, topic, msg):
self.scheduler._recorded_events.append((topic, msg))

await c.register_plugin(EventPlugin())

Expand All @@ -455,6 +455,27 @@ def f():
assert ("foo", 123) in s._recorded_events


@gen_cluster(client=True)
async def test_log_event_plugin_multiple_topics(c, s, a, b):
class EventPlugin(SchedulerPlugin):
async def start(self, scheduler: Scheduler) -> None:
self.scheduler = scheduler
self.scheduler._recorded_events = list() # type: ignore

def log_event(self, topic, msg):
self.scheduler._recorded_events.append((topic, msg))

await c.register_plugin(EventPlugin())

def f():
get_worker().log_event(["foo", "bar"], 123)

await c.submit(f)

assert ("foo", 123) in s._recorded_events
assert ("bar", 123) in s._recorded_events


@gen_cluster(client=True)
async def test_register_plugin_on_scheduler(c, s, a, b):
class MyPlugin(SchedulerPlugin):
Expand Down
17 changes: 7 additions & 10 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8413,19 +8413,16 @@ def log_event(self, topic: str | Collection[str], msg: Any) -> None:
Client.log_event
"""
event = (time(), msg)
if not isinstance(topic, str):
for t in topic:
self.events[t].append(event)
self.event_counts[t] += 1
self._report_event(t, event)
else:
self.events[topic].append(event)
self.event_counts[topic] += 1
self._report_event(topic, event)
if isinstance(topic, str):
topic = [topic]
for t in topic:
self.events[t].append(event)
self.event_counts[t] += 1
self._report_event(t, event)

for plugin in list(self.plugins.values()):
try:
plugin.log_event(topic, msg)
plugin.log_event(t, msg)
except Exception:
logger.info("Plugin failed with exception", exc_info=True)

Expand Down

0 comments on commit af237f0

Please sign in to comment.