diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 311ec76..1118cd1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,4 +58,4 @@ jobs: with: file: coverage.xml flags: unittests - name: py-${{ matrix.python-version }}-${{ matrix.os }} \ No newline at end of file + name: py-${{ matrix.python-version }}-${{ matrix.os }} diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 3bdcf5a..1917492 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -53,4 +53,4 @@ jobs: uses: pypa/gh-action-pypi-publish@master with: user: __token__ - password: ${{ secrets.PYPI_TOKEN }} \ No newline at end of file + password: ${{ secrets.PYPI_TOKEN }} diff --git a/.gitignore b/.gitignore index 0afe9c7..9b6bccf 100644 --- a/.gitignore +++ b/.gitignore @@ -133,4 +133,4 @@ dmypy.json .env docker_test/* -docker_test \ No newline at end of file +docker_test diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c974de1..b89295e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,4 +21,4 @@ repos: - id: check-yaml - id: debug-statements - id: double-quote-string-fixer - - id: requirements-txt-fixer \ No newline at end of file + - id: requirements-txt-fixer diff --git a/README.md b/README.md index bc76fd5..0adc859 100644 --- a/README.md +++ b/README.md @@ -64,4 +64,3 @@ if __name__ == '__main__': main() ``` - diff --git a/apscheduler_di/__init__.py b/apscheduler_di/__init__.py index 9e74110..f91ff7c 100644 --- a/apscheduler_di/__init__.py +++ b/apscheduler_di/__init__.py @@ -1,8 +1,13 @@ from ._binding import UnableToResolveDependencyError from .decorator import ContextSchedulerDecorator -__maintainer__ = "GLEF1X" +__maintainer__ = 'GLEF1X' -__version__ = "0.0.4" +__version__ = '0.0.4' -__all__ = ('ContextSchedulerDecorator', "__maintainer__", "__version__", "UnableToResolveDependencyError") +__all__ = ( + 'ContextSchedulerDecorator', + '__maintainer__', + '__version__', + 'UnableToResolveDependencyError', +) diff --git a/apscheduler_di/_binding.py b/apscheduler_di/_binding.py index ca2ef1c..d704cb9 100644 --- a/apscheduler_di/_binding.py +++ b/apscheduler_di/_binding.py @@ -1,11 +1,11 @@ import functools import inspect from inspect import Signature, _ParameterKind -from typing import TypeVar, Callable, Any, Dict, get_type_hints, List +from typing import Any, Callable, Dict, List, TypeVar, get_type_hints -from rodi import Services, CannotResolveTypeException, GetServiceContext +from rodi import CannotResolveTypeException, GetServiceContext, Services -T = TypeVar("T", bound=Callable[..., Any]) +T = TypeVar('T', bound=Callable[..., Any]) class NormalizationError(Exception): @@ -13,7 +13,7 @@ class NormalizationError(Exception): class ParamSpec: - __slots__ = ("name", "annotation", "kind", "default", "_str") + __slots__ = ('name', 'annotation', 'kind', 'default', '_str') def __init__(self, name, annotation, kind, default, str_repr): self.name = name @@ -29,20 +29,23 @@ def __str__(self) -> str: class UnsupportedSignatureError(NormalizationError): def __init__(self, method: Callable[..., Any]): super().__init__( - f"Cannot normalize method `{method.__qualname__}` because its " - f"signature contains *args, or *kwargs, or keyword only parameters. " - f"If you use a decorator, please use `functools.@wraps` " - f"with your wrapper, to fix this error." + f'Cannot normalize method `{method.__qualname__}` because its ' + f'signature contains *args, or *kwargs, or keyword only parameters. ' + f'If you use a decorator, please use `functools.@wraps` ' + f'with your wrapper, to fix this error.' ) class UnableToResolveDependencyError(Exception): def __init__(self, message: str, third_party_di_lib_exception: Exception): - super().__init__(f"Unable to resolve the dependency: {message}") + super().__init__(f'Unable to resolve the dependency: {message}') self.third_party_di_lib_exception = third_party_di_lib_exception -def normalize_job_executable(func: Callable[..., Any], services: Services, ) -> Callable[..., Any]: +def normalize_job_executable( + func: Callable[..., Any], + services: Services, +) -> Callable[..., Any]: check_if_signature_is_supported(func) if inspect.iscoroutinefunction(func) or inspect.iscoroutine(func): @@ -57,8 +60,8 @@ def check_if_signature_is_supported(func: Callable[..., Any]) -> None: params = get_func_param_specs(func) if any( - str(param).startswith("*") or param.kind.value == _ParameterKind.KEYWORD_ONLY - for param in params.values() + str(param).startswith('*') or param.kind.value == _ParameterKind.KEYWORD_ONLY + for param in params.values() ): raise UnsupportedSignatureError(func) @@ -80,8 +83,8 @@ def get_func_param_specs(method: Callable[..., Any]) -> Dict[str, ParamSpec]: def _get_method_annotations_or_throw(method: Callable[..., Any]) -> Dict[str, Any]: - method_locals = getattr(method, "_locals", None) - method_globals = getattr(method, "_globals", None) + method_locals = getattr(method, '_locals', None) + method_globals = getattr(method, '_globals', None) return get_type_hints(method, globalns=method_globals, localns=method_locals) @@ -101,7 +104,9 @@ async def wrapper(*args, **kwargs): return wrapper -def resolve_dependencies(services: Services, func: Callable[..., Any], **kwargs: Any) -> List[Any]: +def resolve_dependencies( + services: Services, func: Callable[..., Any], **kwargs: Any +) -> List[Any]: dependencies = [] for param_spec in get_func_param_specs(func).values(): with GetServiceContext() as context: diff --git a/apscheduler_di/_events.py b/apscheduler_di/_events.py index 994f287..e66cd36 100644 --- a/apscheduler_di/_events.py +++ b/apscheduler_di/_events.py @@ -1,8 +1,5 @@ import functools -from typing import ( - Any, - Callable, -) +from typing import Any, Callable from apscheduler.schedulers.base import BaseScheduler from rodi import Container @@ -14,15 +11,14 @@ def __init__(self, scheduler: BaseScheduler, ctx: Container, on_event: int) -> N self._on_event = on_event self._ctx = ctx - def __iadd__(self, handler: Callable[..., Any]) -> "ApschedulerEvent": + def __iadd__(self, handler: Callable[..., Any]) -> 'ApschedulerEvent': with_context = functools.partial(handler, ctx=self._ctx) self._scheduler.add_listener(callback=with_context, mask=self._on_event) return self - def __isub__(self, handler: Callable[..., Any]) -> "ApschedulerEvent": + def __isub__(self, handler: Callable[..., Any]) -> 'ApschedulerEvent': self._scheduler.remove_listener(callback=handler) return self def __len__(self) -> int: return len(self._scheduler._listeners) # noqa - diff --git a/apscheduler_di/_helper.py b/apscheduler_di/_helper.py index 9a86ea5..b7c4397 100644 --- a/apscheduler_di/_helper.py +++ b/apscheduler_di/_helper.py @@ -1,10 +1,11 @@ -from typing import Tuple, Any, Dict, Callable +from typing import Any, Callable, Dict, Tuple from apscheduler_di._binding import get_func_param_specs -def get_missing_arguments(func: Callable[..., Any], args: Tuple[Any, ...], - kwargs: Dict[Any, Any]) -> Dict[str, None]: +def get_missing_arguments( + func: Callable[..., Any], args: Tuple[Any, ...], kwargs: Dict[Any, Any] +) -> Dict[str, None]: """ Get arguments to skip ValueError with traceback "The following arguments have not been supplied" It raises, because we injecting our dependencies using functools.wraps and do not change diff --git a/apscheduler_di/_inject.py b/apscheduler_di/_inject.py index af11130..c075b26 100644 --- a/apscheduler_di/_inject.py +++ b/apscheduler_di/_inject.py @@ -16,17 +16,20 @@ def inject_dependencies_to_scheduler(scheduler: BaseScheduler, ctx: Container): prepared_context = ctx.build_provider() for job_store in scheduler._jobstores.values(): # type: BaseJobStore # noqa + def func_get_due_jobs_with_context(c: BaseJobStore, now: datetime): jobs: List[Job] = type(job_store).get_due_jobs(c, now) - return _convert_raw_to_easy_maintainable_jobs(scheduler, jobs, prepared_context) + return _convert_raw_to_easy_maintainable_jobs( + scheduler, jobs, prepared_context + ) - job_store.get_due_jobs = types.MethodType(func_get_due_jobs_with_context, job_store) + job_store.get_due_jobs = types.MethodType( + func_get_due_jobs_with_context, job_store + ) def _convert_raw_to_easy_maintainable_jobs( - scheduler: BaseScheduler, - jobs: List[Job], - ctx: Services + scheduler: BaseScheduler, jobs: List[Job], ctx: Services ) -> List[Job]: unsafe_pickling = False if isinstance(scheduler, BlockingScheduler): @@ -39,7 +42,9 @@ def _convert_raw_to_easy_maintainable_jobs( return jobs -def _make_jobs_shared(jobs: List[Job], scheduler: BaseScheduler, ctx: Services) -> List[SharedJob]: +def _make_jobs_shared( + jobs: List[Job], scheduler: BaseScheduler, ctx: Services +) -> List[SharedJob]: shared_jobs: List[SharedJob] = [] for job in jobs: if isinstance(job, SharedJob): diff --git a/apscheduler_di/_serialization.py b/apscheduler_di/_serialization.py index 387c8cb..c9a35c6 100644 --- a/apscheduler_di/_serialization.py +++ b/apscheduler_di/_serialization.py @@ -1,5 +1,5 @@ import pickle -from typing import Callable, Any +from typing import Any, Callable from apscheduler.job import Job from apscheduler.schedulers.base import BaseScheduler @@ -16,17 +16,16 @@ def _load_func_from_ref(func_ref: str, ctx: Services) -> Callable[..., Any]: class SharedJob(Job): - def __init__(self, scheduler: BaseScheduler, ctx: Services, **kwargs): - fn_args = kwargs.get("args", ()) - fn_kwargs = kwargs.get("kwargs", {}) - fn = kwargs["func"] + fn_args = kwargs.get('args', ()) + fn_kwargs = kwargs.get('kwargs', {}) + fn = kwargs['func'] if not callable(fn): fn = ref_to_obj(fn) - kwargs["kwargs"].update(get_missing_arguments(fn, fn_args, fn_kwargs)) + kwargs['kwargs'].update(get_missing_arguments(fn, fn_args, fn_kwargs)) - if kwargs.get("version") is not None: - kwargs.pop("version") # pragma: no cover + if kwargs.get('version') is not None: + kwargs.pop('version') # pragma: no cover super().__init__(scheduler, **kwargs) self.kwargs = {} self._ctx = ctx @@ -39,10 +38,10 @@ def __getstate__(self): def __setstate__(self, state): if state.get('version', 1) > 1: raise ValueError( # pragma: no cover - 'Job has version %s, but only version 1 can be handled' % - state['version'] + 'Job has version %s, but only version 1 can be handled' + % state['version'] ) - self._ctx = pickle.loads(state["ctx"]) + self._ctx = pickle.loads(state['ctx']) self.id = state['id'] self.func_ref = state['func'] self.args = state['args'] diff --git a/apscheduler_di/decorator.py b/apscheduler_di/decorator.py index 5de0f6e..e439a66 100644 --- a/apscheduler_di/decorator.py +++ b/apscheduler_di/decorator.py @@ -7,30 +7,22 @@ import warnings from datetime import datetime from threading import RLock -from typing import Any, Dict, Callable, Optional, List, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple import six -from apscheduler.events import ( - EVENT_ALL, - SchedulerEvent, - EVENT_SCHEDULER_STARTED, - EVENT_JOBSTORE_ADDED, - EVENT_JOB_ERROR, - EVENT_SCHEDULER_SHUTDOWN, - EVENT_SCHEDULER_PAUSED, - EVENT_SCHEDULER_RESUMED, - EVENT_EXECUTOR_ADDED, - EVENT_EXECUTOR_REMOVED, - EVENT_JOB_ADDED, - EVENT_JOB_MODIFIED, - EVENT_JOB_SUBMITTED, - EVENT_JOB_MISSED, - EVENT_ALL_JOBS_REMOVED -) +from apscheduler.events import (EVENT_ALL, EVENT_ALL_JOBS_REMOVED, + EVENT_EXECUTOR_ADDED, EVENT_EXECUTOR_REMOVED, + EVENT_JOB_ADDED, EVENT_JOB_ERROR, + EVENT_JOB_MISSED, EVENT_JOB_MODIFIED, + EVENT_JOB_SUBMITTED, EVENT_JOBSTORE_ADDED, + EVENT_SCHEDULER_PAUSED, + EVENT_SCHEDULER_RESUMED, + EVENT_SCHEDULER_SHUTDOWN, + EVENT_SCHEDULER_STARTED, SchedulerEvent) from apscheduler.job import Job from apscheduler.jobstores.base import BaseJobStore from apscheduler.schedulers.asyncio import AsyncIOScheduler, run_in_event_loop -from apscheduler.schedulers.base import BaseScheduler, STATE_STOPPED +from apscheduler.schedulers.base import STATE_STOPPED, BaseScheduler from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.util import undefined from rodi import Container @@ -41,40 +33,64 @@ class ContextSchedulerDecorator(BaseScheduler): - def __init__(self, scheduler: BaseScheduler): self.ctx = Container() self._scheduler = scheduler # Scheduler events - self.on_startup = ApschedulerEvent(scheduler, self.ctx, on_event=EVENT_SCHEDULER_STARTED) - self.on_shutdown = ApschedulerEvent(scheduler, self.ctx, on_event=EVENT_SCHEDULER_SHUTDOWN) - self.on_pause = ApschedulerEvent(scheduler, self.ctx, on_event=EVENT_SCHEDULER_PAUSED) - self.on_resume = ApschedulerEvent(scheduler, self.ctx, on_event=EVENT_SCHEDULER_RESUMED) + self.on_startup = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_SCHEDULER_STARTED + ) + self.on_shutdown = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_SCHEDULER_SHUTDOWN + ) + self.on_pause = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_SCHEDULER_PAUSED + ) + self.on_resume = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_SCHEDULER_RESUMED + ) # executor events - self.on_executor_add = ApschedulerEvent(scheduler, self.ctx, - on_event=EVENT_EXECUTOR_ADDED) - self.on_executor_removed = ApschedulerEvent(scheduler, self.ctx, - on_event=EVENT_EXECUTOR_REMOVED) + self.on_executor_add = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_EXECUTOR_ADDED + ) + self.on_executor_removed = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_EXECUTOR_REMOVED + ) # job events - self.on_job_error = ApschedulerEvent(scheduler, self.ctx, on_event=EVENT_JOB_ERROR) - self.on_job_added = ApschedulerEvent(scheduler, self.ctx, on_event=EVENT_JOB_ADDED) - self.on_job_modified = ApschedulerEvent(scheduler, self.ctx, on_event=EVENT_JOB_MODIFIED) - self.on_job_submitted = ApschedulerEvent(scheduler, self.ctx, on_event=EVENT_JOB_SUBMITTED) - self.on_job_missed = ApschedulerEvent(scheduler, self.ctx, on_event=EVENT_JOB_MISSED) + self.on_job_error = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_JOB_ERROR + ) + self.on_job_added = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_JOB_ADDED + ) + self.on_job_modified = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_JOB_MODIFIED + ) + self.on_job_submitted = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_JOB_SUBMITTED + ) + self.on_job_missed = ApschedulerEvent( + scheduler, self.ctx, on_event=EVENT_JOB_MISSED + ) self.on_all_jobs_removed = ApschedulerEvent( scheduler, self.ctx, on_event=EVENT_ALL_JOBS_REMOVED ) - self.on_startup += lambda event, ctx: inject_dependencies_to_scheduler(self._scheduler, ctx) + self.on_startup += lambda event, ctx: inject_dependencies_to_scheduler( + self._scheduler, ctx + ) self._scheduler.add_listener( lambda event: inject_dependencies_to_scheduler(self._scheduler, self.ctx), - mask=EVENT_JOBSTORE_ADDED + mask=EVENT_JOBSTORE_ADDED, + ) + self._scheduler._dispatch_event = types.MethodType( + _dispatch_event, self._scheduler ) - self._scheduler._dispatch_event = types.MethodType(_dispatch_event, self._scheduler) if isinstance(self._scheduler, BlockingScheduler): + def save_ssl_context(obj): return obj.__class__, (obj.protocol,) @@ -92,21 +108,23 @@ def shutdown(self, wait: bool = True) -> None: return run_in_event_loop(self._scheduler.shutdown)() self._scheduler.shutdown(wait=wait) - def add_job(self, - func: Callable[..., Any], - trigger: Optional[str] = None, - args: Tuple[Any, ...] = (), - kwargs: Optional[Dict[Any, Any]] = None, - id: Optional[str] = None, - name: Optional[str] = None, - misfire_grace_time: int = undefined, - coalesce: bool = undefined, - max_instances: int = undefined, - next_run_time: datetime = undefined, - jobstore: str = 'default', - executor: str = 'default', - replace_existing: bool = False, - **trigger_args) -> Job: # pragma: no cover + def add_job( + self, + func: Callable[..., Any], + trigger: Optional[str] = None, + args: Tuple[Any, ...] = (), + kwargs: Optional[Dict[Any, Any]] = None, + id: Optional[str] = None, + name: Optional[str] = None, + misfire_grace_time: int = undefined, + coalesce: bool = undefined, + max_instances: int = undefined, + next_run_time: datetime = undefined, + jobstore: str = 'default', + executor: str = 'default', + replace_existing: bool = False, + **trigger_args + ) -> Job: # pragma: no cover if kwargs is None: kwargs = {} @@ -123,9 +141,13 @@ def add_job(self, 'misfire_grace_time': misfire_grace_time, 'coalesce': coalesce, 'max_instances': max_instances, - 'next_run_time': next_run_time + 'next_run_time': next_run_time, } - job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined) + job_kwargs = dict( + (key, value) + for key, value in six.iteritems(job_kwargs) + if value is not undefined + ) job = Job(self._scheduler, **job_kwargs) job.kwargs = kwargs @@ -133,29 +155,43 @@ def add_job(self, with self._scheduler._jobstores_lock: if self._scheduler.state == STATE_STOPPED: self._scheduler._pending_jobs.append((job, jobstore, replace_existing)) - self._scheduler._logger.info('Adding job tentatively -- it will be properly scheduled when ' - 'the scheduler starts') + self._scheduler._logger.info( + 'Adding job tentatively -- it will be properly scheduled when ' + 'the scheduler starts' + ) else: self._scheduler._real_add_job(job, jobstore, replace_existing) return job - def scheduled_job(self, - trigger: Optional[str] = None, - args: Optional[Tuple[Any, ...]] = None, - kwargs: Optional[Dict[Any, Any]] = None, - id: Optional[str] = None, - name: Optional[str] = None, - misfire_grace_time: int = undefined, - coalesce: bool = undefined, - max_instances: int = undefined, - next_run_time: datetime = undefined, - jobstore: str = 'default', - executor: str = 'default', - **trigger_args) -> Job: + def scheduled_job( + self, + trigger: Optional[str] = None, + args: Optional[Tuple[Any, ...]] = None, + kwargs: Optional[Dict[Any, Any]] = None, + id: Optional[str] = None, + name: Optional[str] = None, + misfire_grace_time: int = undefined, + coalesce: bool = undefined, + max_instances: int = undefined, + next_run_time: datetime = undefined, + jobstore: str = 'default', + executor: str = 'default', + **trigger_args + ) -> Job: return self._scheduler.scheduled_job( - trigger, args, kwargs, id, name, misfire_grace_time, coalesce, - max_instances, next_run_time, jobstore, executor, **trigger_args + trigger, + args, + kwargs, + id, + name, + misfire_grace_time, + coalesce, + max_instances, + next_run_time, + jobstore, + executor, + **trigger_args ) def resume_job(self, job_id: str, jobstore: Optional[str] = None) -> Optional[Job]: @@ -164,9 +200,13 @@ def resume_job(self, job_id: str, jobstore: Optional[str] = None) -> Optional[Jo def resume(self): self._scheduler.resume() - def reschedule_job(self, job_id: str, jobstore: Optional[str] = None, - trigger: Optional[str] = None, - **trigger_args: Any) -> Job: + def reschedule_job( + self, + job_id: str, + jobstore: Optional[str] = None, + trigger: Optional[str] = None, + **trigger_args: Any + ) -> Job: return self._scheduler.reschedule_job(job_id, jobstore, trigger, **trigger_args) def remove_listener(self, callback: Callable[..., Any]) -> None: @@ -196,7 +236,9 @@ def print_jobs(self, jobstore: Optional[str] = None, out: Optional[Any] = None): def pause_job(self, job_id: str, jobstore: Optional[str] = None) -> Job: return self._scheduler.pause_job(job_id, jobstore) - def modify_job(self, job_id: str, jobstore: Optional[str] = None, **changes: Any) -> Job: + def modify_job( + self, job_id: str, jobstore: Optional[str] = None, **changes: Any + ) -> Job: return self._scheduler.modify_job(job_id, jobstore, **changes) def get_jobs(self, jobstore: Optional[str] = None, pending=None) -> List[Job]: @@ -259,17 +301,19 @@ def _dispatch_event(s: BaseScheduler, event: SchedulerEvent): _run_callback(s, callback=cb, event=event) -def _run_callback(scheduler: BaseScheduler, callback: Callable[..., Any], event: SchedulerEvent): +def _run_callback( + scheduler: BaseScheduler, callback: Callable[..., Any], event: SchedulerEvent +): try: if _is_function_coroutine(callback): if isinstance(scheduler, AsyncIOScheduler): scheduler._eventloop.create_task(callback(event)) else: warnings.warn( - "Running async events with sync scheduler" - "shall lead to unpredictable behavior and unclosed descriptors or sockets", + 'Running async events with sync scheduler' + 'shall lead to unpredictable behavior and unclosed descriptors or sockets', UserWarning, - stacklevel=3 + stacklevel=3, ) asyncio.create_task(callback(event)) else: diff --git a/examples/__init__.py b/examples/__init__.py index 8b13789..e69de29 100644 --- a/examples/__init__.py +++ b/examples/__init__.py @@ -1 +0,0 @@ - diff --git a/examples/advanced.py b/examples/advanced.py index f4a16b7..19821d5 100644 --- a/examples/advanced.py +++ b/examples/advanced.py @@ -1,6 +1,6 @@ import asyncio import dataclasses -from abc import abstractmethod, ABC +from abc import ABC, abstractmethod from typing import Dict from apscheduler.jobstores.redis import RedisJobStore @@ -10,8 +10,8 @@ # pip install redis job_stores: Dict[str, RedisJobStore] = { - "default": RedisJobStore( - jobs_key="dispatched_trips_jobs", run_times_key="dispatched_trips_running" + 'default': RedisJobStore( + jobs_key='dispatched_trips_jobs', run_times_key='dispatched_trips_running' ) } @@ -36,7 +36,7 @@ def get_by_id(self, _id) -> Cat: class PostgresCatsRepository(ICatsRepository): def get_by_id(self, _id) -> Cat: # TODO: implement logic to use a connection to the db - return Cat("...") + return Cat('...') async def some_job(repository: ICatsRepository, config: Config): @@ -46,14 +46,14 @@ async def some_job(repository: ICatsRepository, config: Config): async def some_infinite_cycle(): while True: - await asyncio.sleep(.5) + await asyncio.sleep(0.5) def run_scheduler(): scheduler = ContextSchedulerDecorator(AsyncIOScheduler(jobstores=job_stores)) scheduler.ctx.add_instance(PostgresCatsRepository(), ICatsRepository) scheduler.ctx.add_instance(Config(some_param=1), Config) - scheduler.add_job(some_job, trigger="interval", seconds=5) + scheduler.add_job(some_job, trigger='interval', seconds=5) scheduler.start() diff --git a/examples/events.py b/examples/events.py index 60285f0..127e5a2 100644 --- a/examples/events.py +++ b/examples/events.py @@ -8,11 +8,11 @@ def tick(): - raise Exception("Ooops, something went wrong(") + raise Exception('Ooops, something went wrong(') async def handle_job_error(event: JobExecutionEvent, ctx: Container): - print("OH NO!! WE GOT EXCEPTION") # handling exception here + print('OH NO!! WE GOT EXCEPTION') # handling exception here async def main(): diff --git a/examples/factories.py b/examples/factories.py index 02a73b3..9b37312 100644 --- a/examples/factories.py +++ b/examples/factories.py @@ -1,5 +1,5 @@ import asyncio -from typing import Dict, Any, Optional +from typing import Any, Dict, Optional from apscheduler.jobstores.redis import RedisJobStore from apscheduler.schedulers.asyncio import AsyncIOScheduler @@ -8,8 +8,8 @@ # pip install redis job_stores: Dict[str, RedisJobStore] = { - "default": RedisJobStore( - jobs_key="dispatched_trips_jobs", run_times_key="dispatched_trips_running" + 'default': RedisJobStore( + jobs_key='dispatched_trips_jobs', run_times_key='dispatched_trips_running' ) } @@ -17,8 +17,8 @@ class DatabaseSession: - - async def query(self) -> Any: ... + async def query(self) -> Any: + ... async def make_request_to_database(session: DatabaseSession): @@ -27,10 +27,7 @@ async def make_request_to_database(session: DatabaseSession): async def main(): scheduler = ContextSchedulerDecorator(AsyncIOScheduler(jobstores=job_stores)) - scheduler.ctx.add_scoped_by_factory( - lambda: DatabaseSession(), - DatabaseSession - ) + scheduler.ctx.add_scoped_by_factory(lambda: DatabaseSession(), DatabaseSession) scheduler.add_job(make_request_to_database, 'interval', seconds=3) scheduler.start() diff --git a/examples/quick_start.py b/examples/quick_start.py index 794b8d2..0fc377b 100644 --- a/examples/quick_start.py +++ b/examples/quick_start.py @@ -8,16 +8,15 @@ # pip install redis job_stores: Dict[str, RedisJobStore] = { - "default": RedisJobStore( - jobs_key="dispatched_trips_jobs", run_times_key="dispatched_trips_running" + 'default': RedisJobStore( + jobs_key='dispatched_trips_jobs', run_times_key='dispatched_trips_running' ) } class Tack: - def tack(self): - print("Tack!") + print('Tack!') def tick(tack: Tack): diff --git a/examples/use_with_aiogram.py b/examples/use_with_aiogram.py index ca213dc..d467faf 100644 --- a/examples/use_with_aiogram.py +++ b/examples/use_with_aiogram.py @@ -9,22 +9,22 @@ # pip install redis job_stores: Dict[str, RedisJobStore] = { - "default": RedisJobStore( - jobs_key="dispatched_trips_jobs", run_times_key="dispatched_trips_running" + 'default': RedisJobStore( + jobs_key='dispatched_trips_jobs', run_times_key='dispatched_trips_running' ) } async def send_message_by_timer(bot: Bot): - await bot.send_message(chat_id=1219185039, text="Hello world!") + await bot.send_message(chat_id=1219185039, text='Hello world!') def run_scheduler(): - token = "BOT TOKEN" + token = 'BOT TOKEN' bot = Bot(token) scheduler = ContextSchedulerDecorator(AsyncIOScheduler(jobstores=job_stores)) scheduler.ctx.add_instance(bot, Bot) - scheduler.add_job(send_message_by_timer, trigger="interval", seconds=5) + scheduler.add_job(send_message_by_timer, trigger='interval', seconds=5) scheduler.start() diff --git a/examples/with_additional_kwargs.py b/examples/with_additional_kwargs.py index 7e26494..0bcdaa1 100644 --- a/examples/with_additional_kwargs.py +++ b/examples/with_additional_kwargs.py @@ -1,5 +1,5 @@ import asyncio -from typing import Dict, Any +from typing import Any, Dict from apscheduler.jobstores.redis import RedisJobStore from apscheduler.schedulers.asyncio import AsyncIOScheduler @@ -8,32 +8,32 @@ # pip install redis job_stores: Dict[str, RedisJobStore] = { - "default": RedisJobStore( - jobs_key="dispatched_trips_jobs", run_times_key="dispatched_trips_running" + 'default': RedisJobStore( + jobs_key='dispatched_trips_jobs', run_times_key='dispatched_trips_running' ) } class Broadcaster: - def start(self) -> None: - print("Tack!") + print('Tack!') def broadcast(user_id: int, broadcaster: Broadcaster, additional_data: Dict[str, Any]): - print(f"Executing broadcast task using {user_id=} and {broadcaster=} and {additional_data}") + print( + f'Executing broadcast task using {user_id=} and {broadcaster=} and {additional_data}' + ) async def main(): scheduler = ContextSchedulerDecorator(AsyncIOScheduler(jobstores=job_stores)) scheduler.ctx.add_instance(Broadcaster(), Broadcaster) - scheduler.add_job(broadcast, 'interval', seconds=3, kwargs={ - "user_id": 543534, - "additional_data": { - "hello": "world" - } - - }) + scheduler.add_job( + broadcast, + 'interval', + seconds=3, + kwargs={'user_id': 543534, 'additional_data': {'hello': 'world'}}, + ) scheduler.start() diff --git a/tests/mocks/mock_injectable.py b/tests/mocks/mock_injectable.py index 532d4f4..10da8be 100644 --- a/tests/mocks/mock_injectable.py +++ b/tests/mocks/mock_injectable.py @@ -16,7 +16,6 @@ def injectable(d: Dependency): class MockJobStore(BaseJobStore): - def lookup_job(self, job_id): pass @@ -43,7 +42,6 @@ def remove_all_jobs(self): class MockJob(Job): - def __init__(self, scheduler, func): super().__init__(scheduler) self.func = func @@ -57,13 +55,10 @@ def _get_run_times(self, now): class InjectableMockOfScheduler(MockScheduler): - def __init__(self, **options): super().__init__(**options) - self._jobstores = { - "default": MockJobStore() - } + self._jobstores = {'default': MockJobStore()} def get_jobs(self, jobstore=None, pending=None): - mock_jobstore = self._jobstores["default"] + mock_jobstore = self._jobstores['default'] return mock_jobstore.get_due_jobs(datetime.now()) diff --git a/tests/mocks/mock_schedulers.py b/tests/mocks/mock_schedulers.py index 6c806af..6dd7580 100644 --- a/tests/mocks/mock_schedulers.py +++ b/tests/mocks/mock_schedulers.py @@ -4,13 +4,11 @@ class MockExecutor(BaseExecutor): - def _do_submit_job(self, job, run_times): pass class MockScheduler(BaseScheduler): - def __init__(self, **options): super().__init__(**options) self.executor = MockExecutor() diff --git a/tests/test_binding_utils.py b/tests/test_binding_utils.py index 92f0b24..544bd6c 100644 --- a/tests/test_binding_utils.py +++ b/tests/test_binding_utils.py @@ -1,7 +1,8 @@ import pytest from rodi import Container, Services -from apscheduler_di._binding import UnsupportedSignatureError, normalize_job_executable +from apscheduler_di._binding import (UnsupportedSignatureError, + normalize_job_executable) class ExampleOne: diff --git a/tests/test_events.py b/tests/test_events.py index f281a7d..4b8742d 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -1,8 +1,7 @@ from typing import List import pytest -from apscheduler.events import EVENT_ALL -from apscheduler.events import EVENT_SCHEDULER_STARTED +from apscheduler.events import EVENT_ALL, EVENT_SCHEDULER_STARTED from rodi import Container from apscheduler_di._events import ApschedulerEvent @@ -30,10 +29,12 @@ def some_event_handler(event): pass -@pytest.fixture(name="event_collection", scope="function") +@pytest.fixture(name='event_collection', scope='function') def event_fixture() -> ApschedulerEvent: stub_scheduler = StubScheduler() - event = ApschedulerEvent(stub_scheduler, on_event=EVENT_SCHEDULER_STARTED, ctx=Container()) + event = ApschedulerEvent( + stub_scheduler, on_event=EVENT_SCHEDULER_STARTED, ctx=Container() + ) yield event stub_scheduler.clear() diff --git a/tests/test_helper.py b/tests/test_helper.py index 18fa8a5..dda7bfc 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -9,4 +9,4 @@ def some_function(arg1: int, arg2: str, arg3: Tuple): def test_get_missing_arguments(): missing_arguments = get_missing_arguments(some_function, args=(), kwargs={}) - assert missing_arguments == {"arg1": None, "arg2": None, "arg3": None} + assert missing_arguments == {'arg1': None, 'arg2': None, 'arg3': None} diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 876b74c..db88fef 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -7,7 +7,7 @@ from apscheduler_di import ContextSchedulerDecorator from apscheduler_di._serialization import SharedJob -from tests.mocks.mock_schedulers import MockScheduler, MockBlockingScheduler +from tests.mocks.mock_schedulers import MockBlockingScheduler, MockScheduler def example_job(): @@ -21,14 +21,19 @@ def example_job2(ssl_context: ssl.SSLContext): def test_pickle_special_job(): container = Container() provider = container.build_provider() - job = SharedJob(MockScheduler(), provider, - func=example_job, - trigger=CronTrigger(second=5), - kwargs={}, args=tuple(), executor='some_executor', - misfire_grace_time=10, - coalesce=False, - max_instances=1, - next_run_time=datetime.now()) + job = SharedJob( + MockScheduler(), + provider, + func=example_job, + trigger=CronTrigger(second=5), + kwargs={}, + args=tuple(), + executor='some_executor', + misfire_grace_time=10, + coalesce=False, + max_instances=1, + next_run_time=datetime.now(), + ) dumped = pickle.dumps(job) assert pickle.loads(dumped) == job @@ -37,15 +42,18 @@ def test_pickle_special_job_with_ssl_context(): container = Container() provider = container.build_provider() scheduler = ContextSchedulerDecorator(MockBlockingScheduler()) - job = SharedJob(scheduler, provider, - func=example_job2, - trigger=CronTrigger(second=5), - kwargs={}, - args=(ssl.SSLContext(),), - executor='some_executor', - misfire_grace_time=10, - coalesce=False, - max_instances=1, - next_run_time=datetime.now()) + job = SharedJob( + scheduler, + provider, + func=example_job2, + trigger=CronTrigger(second=5), + kwargs={}, + args=(ssl.SSLContext(),), + executor='some_executor', + misfire_grace_time=10, + coalesce=False, + max_instances=1, + next_run_time=datetime.now(), + ) dumped = pickle.dumps(job) assert pickle.loads(dumped) == job diff --git a/tests/test_shared_job.py b/tests/test_shared_job.py index 9971aee..5836f2f 100644 --- a/tests/test_shared_job.py +++ b/tests/test_shared_job.py @@ -15,14 +15,19 @@ def example_job(): def test_pickle_shared_job_with_ref_to_func(): ctx = Container().build_provider() - shared_job = SharedJob(MockScheduler(), ctx, - func=obj_to_ref(example_job), - trigger=CronTrigger(second=5), - kwargs={}, args=tuple(), executor='some_executor', - misfire_grace_time=10, - coalesce=False, - max_instances=1, - next_run_time=datetime.now()) + shared_job = SharedJob( + MockScheduler(), + ctx, + func=obj_to_ref(example_job), + trigger=CronTrigger(second=5), + kwargs={}, + args=tuple(), + executor='some_executor', + misfire_grace_time=10, + coalesce=False, + max_instances=1, + next_run_time=datetime.now(), + ) pickled = pickle.dumps(shared_job) assert pickle.loads(pickled) == shared_job