Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dispatching in TaskCollection #8903

Merged

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Oct 23, 2024

We refactored the TaskState a while back such that TaskPrefix, TaskGroup classes are subclasses and use similar interfaces to update their internal state. See #8681 and linked ticket for context.

For large graphs (see below pyspy profile of the highlevel climatology workload of the coiled geo benchmarks) this class structure is adding notable overhead.

scheduler.zip

This screen shows one of many of the papercuts we're dealing with here

(This is a left heavy view of a client_releases_keys of about 1M tasks)

image

The state update/transition and other related state udpates are popping up all over the place. Tiny papercuts that add up over time. I haven't done a measurement how this will play out end to end. Likely not a huge deal but small things add up for those hot paths.

Micro benchmarks
all_states = [
    "released",
    "waiting",
    "no-worker",
    "queued",
    "processing",
    "memory",
    "erred",
    "forgotten",
]

class TaskCollection:
    def __init__(self):
        self.states = dict.fromkeys(all_states, 0)
    
    def transition(self, old, new):
        self.states[old] -= 1
        self.states[new] += 1

class TaskGroup(TaskCollection):
    def __init__(self):
        super().__init__()

    def transition(self, old, new):
        super().transition(old, new)
        
class TaskGroupNoSuper(TaskCollection):
    def __init__(self):
        TaskCollection.__init__(self)

    def transition(self, old, new):
        TaskCollection.transition(self, old, new)


class TaskState:
    def __init__(self):
        self.group = TaskGroup()
        self._state = "released"

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value) -> None:
        self.group.transition(self._state, value)
        self._state = value

class TaskStateNoSuper:
    def __init__(self):
        self.group = TaskGroupNoSuper()
        self._state = "released"

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value) -> None:
        self.group.transition(self._state, value)
        self._state = value

class TaskStateNoIndirection:
    def __init__(self):
        self.group = TaskGroupNoSuper()
        self._state = "released"

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value) -> None:
        self.group.states[self._state] -= 1
        self.group.states[value] += 1
        self._state = value

image

The micro benchmarks (see above) show that ditching the indirection entirely speeds this up by a factor of 2. Not game changing but this abstraction is not worth any performance penalty.

@@ -1149,7 +1130,7 @@ class TaskGroup(TaskCollection):
__slots__ = tuple(__annotations__)

def __init__(self, name: str, prefix: TaskPrefix):
super().__init__(name)
TaskCollection.__init__(self, name)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super() is convenient and very useful in complex MROs but it also adds overhead which we don't need/want here.

Comment on lines -1185 to -1186
super().update_nbytes(diff)
self.prefix.update_nbytes(diff)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't benchmarked this but the function dispatch is more epensive than the computation itself.

Comment on lines +1035 to +1037
duration_us = max(round((stop - start) * 1e6), 0)
self._duration_us += duration_us
self._all_durations_us[action] += duration_us
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this contradicts DRY but I think it's worth it considering that it's just two places and the complexity is low

@@ -1033,7 +1015,7 @@ class TaskPrefix(TaskCollection):
__slots__ = tuple(__annotations__)

def __init__(self, name: str):
super().__init__(name)
TaskCollection.__init__(self, name)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it still makes some sense to keep the baseclass for common state instantiation and getters but we should try to avoid super

@property
def types(self) -> Set[str]:
"""The result types of this collection"""
return self._types.keys()

def update_nbytes(self, diff: int) -> None:
self.nbytes_total += diff

@staticmethod
def _calculate_duration_us(start: float, stop: float) -> int:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: this is dead (dispatch not worth it)

Comment on lines 985 to +986
def __len__(self) -> int:
return sum(self.states.values())
return self._size
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen plugins accessing this and this can also be costly if calculated on demand

Copy link
Contributor

github-actions bot commented Oct 23, 2024

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    25 files  +    1      25 suites  +1   10h 28m 45s ⏱️ + 30m 13s
 4 132 tests ±    0   4 014 ✅  -     2    110 💤  -  1  8 ❌ +3 
47 730 runs  +1 413  45 625 ✅ +1 367  2 096 💤 +42  9 ❌ +4 

For more details on these failures, see this check.

Results for commit e660bbf. ± Comparison against base commit 48509b3.

♻️ This comment has been updated with latest results.

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @fjetter

@hendrikmakait hendrikmakait merged commit ab0d36c into dask:main Oct 24, 2024
19 of 30 checks passed
@fjetter fjetter deleted the perf_remove_dispatching_task_collection branch October 24, 2024 13:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants