Skip to content

Commit

Permalink
Merge pull request #247 from Bubobubobubobubo/dephasing-fix
Browse files Browse the repository at this point in the history
Fix occasional double iterations when reloading runners
  • Loading branch information
Bubobubobubobubo authored Jul 18, 2023
2 parents ca7d12f + 58fd9df commit 48ab9fa
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions sardine_core/scheduler/async_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import heapq
import inspect
import math
import traceback
from collections import deque
from dataclasses import dataclass
Expand Down Expand Up @@ -190,7 +191,7 @@ class AsyncRunner:
_can_correct_interval: bool
_expected_time: float
_last_interval: float
_last_iteration_called: bool
_last_expected_time: float
_last_state: Optional[FunctionState]

def __init__(self, name: str):
Expand All @@ -213,7 +214,7 @@ def __init__(self, name: str):
self._can_correct_interval = False
self._expected_time = 0.0
self._last_interval = 0.0
self._last_iteration_called = False
self._last_expected_time = -math.inf
self._last_state = None

def __repr__(self):
Expand Down Expand Up @@ -471,13 +472,25 @@ def _get_next_deadline(self, period: Union[float, int]) -> float:
Returns:
float: The deadline for the next interval.
"""
# We only want to use the expected time if the last iteration
# ran its function normally - if the runner skipped the iteration,
# that means we haven't yet reached the deadline
if self._last_iteration_called:
time = self._expected_time
else:
time = self.clock.time
# If this is called earlier than the expected time, we should use
# the current time to avoid calculating the next beat too far ahead,
# which would cause an unusually long gap between iterations.
#
# If this is called after the expected time has already passed,
# we should continue from that iteration and ignore the current time.
# This allows returning an overdue deadline potentially caused by a
# high delta, letting missed iterations fire ASAP.
#
# Given the above requirements, this would be the ideal solution:
# time = min(self.clock.time, self._expected_time)
#
# However, this is complicated by SleepHandler which does not guarantee
# that a successful iteration will never be earlier than the deadline.
# As such, we will additionally prevent the time from being sooner than
# the last successful iteration.
# If we ignored this and allowed deadlines earlier than the last iteration,
# the above solution could potentially trigger non-missed iterations too early.
time = max(self._last_expected_time, min(self.clock.time, self._expected_time))

self._check_snap(time)
if self.snap is not None:
Expand Down Expand Up @@ -518,7 +531,7 @@ async def _runner(self):
print(f"[yellow][Stopped [red]{self.name}[/red]][/yellow]")

def _prepare(self):
self._last_iteration_called = False
self._last_expected_time = -math.inf
self._last_state = self._get_state()
self._swimming = True
self._stop = False
Expand Down Expand Up @@ -597,7 +610,7 @@ async def _run_once(self):
name=f"asyncrunner-func-{self.name}",
)
finally:
self._last_iteration_called = True
self._last_expected_time = self._expected_time

async def _call_func(self, func, args, kwargs):
"""Calls the given function and optionally applies time shift
Expand Down Expand Up @@ -668,5 +681,4 @@ def _revert_state(self):
self._has_reverted = True

def _skip_iteration(self) -> None:
self._last_iteration_called = False
self.swim()

0 comments on commit 48ab9fa

Please sign in to comment.