From ff293346b08c3c63d410830e4c68cd9ad428e78f Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Wed, 19 Jun 2024 22:03:55 -0400 Subject: [PATCH] fix: Fix `get` command in the result attribute, and add `get_group_tasks` (#11) --- .makim.yaml | 12 ---- containers/compose.yaml | 2 - example/tasks/parallel.py | 66 ----------------- example/tasks/serial.py | 66 ----------------- examples/README.md | 8 +++ .../redis_queue_between_tasks}/.gitignore | 0 examples/redis_queue_between_tasks/README.md | 11 +++ .../redis_queue_between_tasks}/__init__.py | 0 .../redis_queue_between_tasks}/app.py | 58 +++++++++++---- examples/redis_queue_between_tasks/run.sh | 14 ++++ .../redis_queue_between_tasks}/settings.py | 0 .../tasks/__init__.py | 0 .../redis_queue_between_tasks}/tasks/app.py | 0 .../tasks/config.py | 0 .../tasks/parallel.py | 66 +++++++++++++++++ .../redis_queue_between_tasks/tasks/serial.py | 70 +++++++++++++++++++ pyproject.toml | 2 +- src/retsu/celery.py | 58 ++++++++++++--- src/retsu/tracking.py | 7 ++ tests/test_task_celery_parallel.py | 20 ++---- tests/test_task_celery_serial.py | 20 ++---- 21 files changed, 285 insertions(+), 195 deletions(-) delete mode 100644 example/tasks/parallel.py delete mode 100644 example/tasks/serial.py create mode 100644 examples/README.md rename {example => examples/redis_queue_between_tasks}/.gitignore (100%) create mode 100644 examples/redis_queue_between_tasks/README.md rename {example => examples/redis_queue_between_tasks}/__init__.py (100%) rename {example => examples/redis_queue_between_tasks}/app.py (57%) create mode 100755 examples/redis_queue_between_tasks/run.sh rename {example => examples/redis_queue_between_tasks}/settings.py (100%) rename {example => examples/redis_queue_between_tasks}/tasks/__init__.py (100%) rename {example => examples/redis_queue_between_tasks}/tasks/app.py (100%) rename {example => examples/redis_queue_between_tasks}/tasks/config.py (100%) create mode 100644 examples/redis_queue_between_tasks/tasks/parallel.py create mode 100644 examples/redis_queue_between_tasks/tasks/serial.py diff --git a/.makim.yaml b/.makim.yaml index bc4708d..4b2a4f9 100644 --- a/.makim.yaml +++ b/.makim.yaml @@ -56,18 +56,6 @@ groups: run: | pytest ${{ args.path }} ${{ args.params }} - example: - help: Run the example app - shell: bash - run: | - set -ex - sugar build - sugar ext restart --options -d - sleep 5 - cd example/ - celery -A tasks.app worker --loglevel=debug & - python app.py - setup: help: Run the setup for the unit tests shell: bash diff --git a/containers/compose.yaml b/containers/compose.yaml index 1e4388b..e2b49e3 100644 --- a/containers/compose.yaml +++ b/containers/compose.yaml @@ -4,13 +4,11 @@ services: valkey: image: valkey/valkey:7.2.5-alpine hostname: valkey - container_name: valkey ports: - 6379:6379 # celery: # hostname: celery - # container_name: celery # build: # context: .. # dockerfile: containers/celery/Dockerfile diff --git a/example/tasks/parallel.py b/example/tasks/parallel.py deleted file mode 100644 index 6b2be2a..0000000 --- a/example/tasks/parallel.py +++ /dev/null @@ -1,66 +0,0 @@ -"""My retsu tasks.""" - -from __future__ import annotations - -from time import sleep - -import celery - -from retsu import ResultTask -from retsu.celery import ParallelCeleryTask - -from .config import app, redis_client - - -@app.task -def task_parallel_a1(a: int, b: int, task_id: str) -> int: # type: ignore - """Define the task_a1.""" - sleep(a + b) - print("running task a1") - result = a + b - redis_client.set(f"result-{task_id}", result) - return result - - -@app.task -def task_parallel_a2(task_id: str) -> int: # type: ignore - """Define the task_a2.""" - print("running task a2") - result = redis_client.get(f"result-{task_id}") - return result - - -@app.task -def task_parallel_final(results, task_id: str) -> int: # type: ignore - """Define the final_task.""" - print("running final task") - - result = redis_client.get(f"result-{task_id}") - final_result = f"Final result: {result}" - print(final_result) - - task_result = ResultTask() - - task_result.save(task_id=task_id, result=final_result) - - return final_result - - -class MyParallelTask1(ParallelCeleryTask): - """MyParallelTask1.""" - - def request(self, a: int, b: int) -> str: - """Receive the request for processing.""" - return super().request(a=a, b=b) - - def get_chord_tasks( - self, a: int, b: int, task_id: str - ) -> list[celery.Signature]: - """Define the list of tasks for celery chord.""" - return ( - [ - task_parallel_a1.s(a, b, task_id), - task_parallel_a2.s(task_id), - ], - task_parallel_final.s(task_id), - ) diff --git a/example/tasks/serial.py b/example/tasks/serial.py deleted file mode 100644 index 77377fb..0000000 --- a/example/tasks/serial.py +++ /dev/null @@ -1,66 +0,0 @@ -"""My retsu tasks.""" - -from __future__ import annotations - -from time import sleep - -import celery - -from retsu import ResultTask -from retsu.celery import SerialCeleryTask - -from .config import app, redis_client - - -@app.task -def task_serial_a1(a: int, b: int, task_id: str) -> int: # type: ignore - """Define the task_a1.""" - sleep(a + b) - print("running task a1") - result = a + b - redis_client.set(f"result-{task_id}", result) - return result - - -@app.task -def task_serial_a2(task_id: str) -> int: # type: ignore - """Define the task_a2.""" - print("running task a2") - result = redis_client.get(f"result-{task_id}") - return result - - -@app.task -def task_serial_final(results, task_id: str) -> int: # type: ignore - """Define the final_task.""" - print("running final task") - - result = redis_client.get(f"result-{task_id}") - final_result = f"Final result: {result}" - print(final_result) - - task_result = ResultTask() - - task_result.save(task_id=task_id, result=final_result) - - return final_result - - -class MySerialTask1(SerialCeleryTask): - """MySerialTask1.""" - - def request(self, a: int, b: int) -> str: - """Receive the request for processing.""" - return super().request(a=a, b=b) - - def get_chord_tasks( - self, a: int, b: int, task_id: str - ) -> tuple[list[celery.Signature], celery.Signature]: - """Define the list of tasks for celery chord.""" - return ( - [ - task_serial_a1.s(a, b, task_id), - task_serial_a2.s(task_id), - ], - task_serial_final.s(task_id), - ) diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..4fa8fed --- /dev/null +++ b/examples/README.md @@ -0,0 +1,8 @@ +# Examples of usage of Retsu + +This folder contains some examples of usage of Retsu. + +## Using redis as a queue between celery tasks + +The `redis_queue_between_tasks` folder contains an example about how to create +extra queues to establish communication across different celery chord tasks. diff --git a/example/.gitignore b/examples/redis_queue_between_tasks/.gitignore similarity index 100% rename from example/.gitignore rename to examples/redis_queue_between_tasks/.gitignore diff --git a/examples/redis_queue_between_tasks/README.md b/examples/redis_queue_between_tasks/README.md new file mode 100644 index 0000000..ae240a7 --- /dev/null +++ b/examples/redis_queue_between_tasks/README.md @@ -0,0 +1,11 @@ +# How to test + +You can run `run.sh` in your terminal, and in the web browser you can try the +following endpoints: + +- http://127.0.0.1:5000/serial/10/20 +- http://127.0.0.1:5000/parallel/10/20 +- http://127.0.0.1:5000/serial/result/[TASK_ID] +- http://127.0.0.1:5000/parallel/result/[TASK_ID] + +Remember to replace `[TASK_ID]` by the desired task id. diff --git a/example/__init__.py b/examples/redis_queue_between_tasks/__init__.py similarity index 100% rename from example/__init__.py rename to examples/redis_queue_between_tasks/__init__.py diff --git a/example/app.py b/examples/redis_queue_between_tasks/app.py similarity index 57% rename from example/app.py rename to examples/redis_queue_between_tasks/app.py index 615e79b..3d8e968 100644 --- a/example/app.py +++ b/examples/redis_queue_between_tasks/app.py @@ -3,7 +3,8 @@ import os import signal -from typing import Optional +from time import sleep +from typing import Any, Optional from flask import Flask from tasks import MyTaskManager @@ -40,55 +41,84 @@ def api() -> str: * parallel * status * result - """ + + Example of endpoints: + + - http://127.0.0.1:5000/serial/1/2 + - http://127.0.0.1:5000/parallel/1/2 + - http://127.0.0.1:5000/serial/result/[TASK_ID] + - http://127.0.0.1:5000/serial/status/[TASK_ID] + - http://127.0.0.1:5000/parallel/result/[TASK_ID] + - http://127.0.0.1:5000/parallel/status/[TASK_ID] + + Remember to replace `[TASK_ID]` by the desired task id. + """.replace("\n", "
") return menu @app.route("/serial//") -def serial(a: int, b: int) -> str: +def serial(a: int, b: int) -> dict[str, Any]: """Define the serial endpoint.""" task1 = task_manager.get_task("serial") key = task1.request(a=a, b=b) - return f"your task ({key}) is running now, please wait until it is done." + return {"message": f"Your task ({key}) is running now"} @app.route("/parallel//") -def parallel(a: int, b: int) -> str: +def parallel(a: int, b: int) -> dict[str, Any]: """Define the parallel endpoint.""" task2 = task_manager.get_task("parallel") key = task2.request(a=a, b=b) - return f"your task ({key}) is running now, please wait until it is done." + return {"message": f"Your task ({key}) is running now"} @app.route("/serial/status/") -def serial_status(task_id: str) -> str: +def serial_status(task_id: str) -> dict[str, Any]: """Define serial/status endpoint.""" task1 = task_manager.get_task("serial") - _status = task1.status(task_id) + _status = task1.result.status(task_id) return {"status": _status, "task_id": task_id} @app.route("/parallel/status/") -def parallel_status(task_id: str) -> str: +def parallel_status(task_id: str) -> dict[str, Any]: """Define parallel/status endpoint.""" task2 = task_manager.get_task("parallel") - _status = task2.status(task_id) + _status = task2.result.status(task_id) return {"status": _status, "task_id": task_id} @app.route("/serial/result/") -def serial_result(task_id: str) -> str: +def serial_result(task_id: str) -> dict[str, Any]: """Define serial/result endpoint.""" task1 = task_manager.get_task("serial") - return task1.get_result(task_id) + result = None + for _ in range(10): + try: + # note: with no timeout + result = task1.result.get(task_id) + break + except Exception: + sleep(1) + + if result is None: + return {"Error": "Result is not ready yet."} + return {"result": result[0]} @app.route("/parallel/result/") -def parallel_result(task_id: str) -> str: +def parallel_result(task_id: str) -> dict[str, Any]: """Define parallel/result endpoint.""" task2 = task_manager.get_task("parallel") - return task2.get_result(task_id) + + try: + # note: with timeout + result = task2.result.get(task_id, timeout=10) + except Exception: + return {"Error": "Result is not ready yet."} + + return {"result": result[-1]} if __name__ == "__main__": diff --git a/examples/redis_queue_between_tasks/run.sh b/examples/redis_queue_between_tasks/run.sh new file mode 100755 index 0000000..53783a8 --- /dev/null +++ b/examples/redis_queue_between_tasks/run.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +set -ex + +pushd ../../ + +sugar build +sugar ext restart --options -d +sleep 5 + +popd + +celery -A tasks.app worker --loglevel=debug & +python app.py diff --git a/example/settings.py b/examples/redis_queue_between_tasks/settings.py similarity index 100% rename from example/settings.py rename to examples/redis_queue_between_tasks/settings.py diff --git a/example/tasks/__init__.py b/examples/redis_queue_between_tasks/tasks/__init__.py similarity index 100% rename from example/tasks/__init__.py rename to examples/redis_queue_between_tasks/tasks/__init__.py diff --git a/example/tasks/app.py b/examples/redis_queue_between_tasks/tasks/app.py similarity index 100% rename from example/tasks/app.py rename to examples/redis_queue_between_tasks/tasks/app.py diff --git a/example/tasks/config.py b/examples/redis_queue_between_tasks/tasks/config.py similarity index 100% rename from example/tasks/config.py rename to examples/redis_queue_between_tasks/tasks/config.py diff --git a/examples/redis_queue_between_tasks/tasks/parallel.py b/examples/redis_queue_between_tasks/tasks/parallel.py new file mode 100644 index 0000000..d9615ae --- /dev/null +++ b/examples/redis_queue_between_tasks/tasks/parallel.py @@ -0,0 +1,66 @@ +"""My retsu tasks.""" + +from __future__ import annotations + +from time import sleep + +import celery + +from retsu.celery import ParallelCeleryTask + +from .config import app, redis_client + + +@app.task +def task_parallel_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore + """Define the task_parallel_a_plus_b.""" + sleep(a + b) + print("running task_parallel_a_plus_b") + result = a + b + redis_client.set(f"parallel-result-a-plus-b-{task_id}", result) + return result + + +@app.task +def task_parallel_result_plus_10(task_id: str) -> int: # type: ignore + """Define the task_parallel_result_plus_10.""" + print("running task_parallel_result_plus_10") + result = None + while result is None: + result = redis_client.get(f"parallel-result-a-plus-b-{task_id}") + sleep(1) + + final_result = int(result) + 10 + redis_client.set(f"parallel-result-plus-10-{task_id}", final_result) + return final_result + + +@app.task +def task_parallel_result_square(results, task_id: str) -> int: # type: ignore + """Define the task_parallel_result_square.""" + print("running task_parallel_result_square") + result = None + while result is None: + result = redis_client.get(f"parallel-result-plus-10-{task_id}") + sleep(1) + return int(result) ** 2 + + +class MyParallelTask1(ParallelCeleryTask): + """MyParallelTask1.""" + + def request(self, a: int, b: int) -> str: + """Receive the request for processing.""" + return super().request(a=a, b=b) + + def get_chord_tasks( + self, a: int, b: int, task_id: str + ) -> list[celery.Signature]: + """Define the list of tasks for celery chord.""" + return ( + [ + task_parallel_a_plus_b.s(a, b, task_id), + task_parallel_result_plus_10.s(task_id), + ], + task_parallel_result_square.s(task_id), + ) diff --git a/examples/redis_queue_between_tasks/tasks/serial.py b/examples/redis_queue_between_tasks/tasks/serial.py new file mode 100644 index 0000000..ce3d02a --- /dev/null +++ b/examples/redis_queue_between_tasks/tasks/serial.py @@ -0,0 +1,70 @@ +"""My retsu tasks.""" + +from __future__ import annotations + +from time import sleep + +import celery + +from retsu.celery import SerialCeleryTask + +from .config import app, redis_client + + +@app.task +def task_serial_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore + """Define the task_serial_a_plus_b.""" + sleep(a + b) + print("running task_serial_a_plus_b") + result = a + b + redis_client.set(f"serial-result-a-plus-b-{task_id}", result) + return result + + +@app.task +def task_serial_result_plus_10(task_id: str) -> int: # type: ignore + """Define the task_serial_result_plus_10.""" + print("running task_serial_result_plus_10") + previous_result = None + while previous_result is None: + previous_result = redis_client.get(f"serial-result-a-plus-b-{task_id}") + sleep(1) + + previous_result_int = int(previous_result) + result = previous_result_int + 10 + redis_client.set(f"serial-result-plus-10-{task_id}", result) + return result + + +@app.task +def task_serial_result_square(results, task_id: str) -> int: # type: ignore + """Define the task_serial_result_square.""" + print("running task_serial_result_square") + previous_result = None + while previous_result is None: + previous_result = redis_client.get(f"serial-result-plus-10-{task_id}") + sleep(1) + + previous_result_int = int(previous_result) + result = previous_result_int**2 + return result + + +class MySerialTask1(SerialCeleryTask): + """MySerialTask1.""" + + def request(self, a: int, b: int) -> str: + """Receive the request for processing.""" + return super().request(a=a, b=b) + + def get_chord_tasks( + self, a: int, b: int, task_id: str + ) -> tuple[list[celery.Signature], celery.Signature]: + """Define the list of tasks for celery chord.""" + return ( + [ + task_serial_a_plus_b.s(a, b, task_id), + task_serial_result_plus_10.s(task_id), + ], + task_serial_result_square.s(task_id), + ) diff --git a/pyproject.toml b/pyproject.toml index 2659708..e3b4378 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,4 +113,4 @@ ignore_missing_imports = true warn_unused_ignores = true warn_redundant_casts = true warn_unused_configs = true -exclude = ["example/", "scripts/"] +exclude = ["examples/", "scripts/"] diff --git a/src/retsu/celery.py b/src/retsu/celery.py index 9674371..24204fa 100644 --- a/src/retsu/celery.py +++ b/src/retsu/celery.py @@ -18,18 +18,30 @@ class CeleryTask: def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore """Define the task to be executed.""" chord_tasks, chord_callback = self.get_chord_tasks( - *args, task_id=task_id, **kwargs + *args, + task_id=task_id, + **kwargs, + ) + group_tasks = self.get_group_tasks( + *args, + task_id=task_id, + **kwargs, + ) + chain_tasks = self.get_chain_tasks( + *args, + task_id=task_id, + **kwargs, ) - chain_tasks = self.get_chain_tasks(*args, task_id=task_id, **kwargs) # start the tasks if chord_tasks: - if chord_callback: - workflow_chord = chord(chord_tasks, chord_callback) - else: - workflow_chord = group(chord_tasks) + workflow_chord = chord(chord_tasks, chord_callback) promise_chord = workflow_chord.apply_async() + if group_tasks: + workflow_group = group(group_tasks) + promise_group = workflow_group.apply_async() + if chain_tasks: workflow_chain = chain(chord_tasks) promise_chain = workflow_chain.apply_async() @@ -37,10 +49,26 @@ def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore # wait for the tasks results: list[Any] = [] if chord_tasks: - results.extend(promise_chord.get()) + chord_result = promise_chord.get() + if isinstance(chord_result, list): + results.extend(chord_result) + else: + results.append(chord_result) + + if group_tasks: + group_result = promise_group.get() + if isinstance(group_result, list): + results.extend(group_result) + else: + results.append(group_result) if chain_tasks: - results.append(promise_chain.get()) + chain_result = promise_chain.get() + + if isinstance(chain_result, list): + results.extend(chain_result) + else: + results.append(chain_result) return results @@ -59,6 +87,20 @@ def get_chord_tasks( # type: ignore callback_task = None return (chord_tasks, callback_task) + def get_group_tasks( # type: ignore + self, *args, **kwargs + ) -> list[celery.Signature]: + """ + Run tasks with group. + + Return + ------ + tuple: + list of tasks for the chord, and the task to be used as a callback + """ + group_tasks: list[celery.Signature] = [] + return group_tasks + def get_chain_tasks( # type: ignore self, *args, **kwargs ) -> list[celery.Signature]: diff --git a/src/retsu/tracking.py b/src/retsu/tracking.py index 1b9703a..4b00908 100644 --- a/src/retsu/tracking.py +++ b/src/retsu/tracking.py @@ -119,6 +119,13 @@ def get(self, task_id: str, timeout: Optional[int] = None) -> Any: "Timeout(get): Task result is not ready yet. " f"Task status: {status}" ) + + elif self.status(task_id) != "completed": + status = self.status(task_id) + raise Exception( + "Timeout(get): Task result is not ready yet. " + f"Task status: {status}" + ) result = self.metadata.get(task_id, "result") return pickle.loads(result) if result else result diff --git a/tests/test_task_celery_parallel.py b/tests/test_task_celery_parallel.py index 15b8123..d4ea08e 100644 --- a/tests/test_task_celery_parallel.py +++ b/tests/test_task_celery_parallel.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Generator, Optional +from typing import Generator import celery import pytest @@ -16,32 +16,26 @@ class MyResultTask(ParallelCeleryTask): """Task for the test.""" - def get_chord_tasks( # type: ignore + def get_group_tasks( # type: ignore self, *args, **kwargs - ) -> tuple[list[celery.Signature], Optional[celery.Signature]]: + ) -> list[celery.Signature]: """Define the list of tasks for celery chord.""" x = kwargs.get("x") y = kwargs.get("y") task_id = kwargs.get("task_id") - return ( - [task_sum.s(x, y, task_id)], - None, - ) + return [task_sum.s(x, y, task_id)] class MyTimestampTask(ParallelCeleryTask): """Task for the test.""" - def get_chord_tasks( # type: ignore + def get_group_tasks( # type: ignore self, *args, **kwargs - ) -> tuple[list[celery.Signature], Optional[celery.Signature]]: + ) -> list[celery.Signature]: """Define the list of tasks for celery chord.""" seconds = kwargs.get("seconds") task_id = kwargs.get("task_id") - return ( - [task_sleep.s(seconds, task_id)], - None, - ) + return [task_sleep.s(seconds, task_id)] @pytest.fixture diff --git a/tests/test_task_celery_serial.py b/tests/test_task_celery_serial.py index 75c0d59..46f6f23 100644 --- a/tests/test_task_celery_serial.py +++ b/tests/test_task_celery_serial.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Generator, Optional +from typing import Generator import celery import pytest @@ -16,32 +16,26 @@ class MyResultTask(SerialCeleryTask): """Task for the test.""" - def get_chord_tasks( # type: ignore + def get_group_tasks( # type: ignore self, *args, **kwargs - ) -> tuple[list[celery.Signature], Optional[celery.Signature]]: + ) -> list[celery.Signature]: """Define the list of tasks for celery chord.""" x = kwargs.get("x") y = kwargs.get("y") task_id = kwargs.get("task_id") - return ( - [task_sum.s(x, y, task_id)], - None, - ) + return [task_sum.s(x, y, task_id)] class MyTimestampTask(SerialCeleryTask): """Task for the test.""" - def get_chord_tasks( # type: ignore + def get_group_tasks( # type: ignore self, *args, **kwargs - ) -> tuple[list[celery.Signature], Optional[celery.Signature]]: + ) -> list[celery.Signature]: """Define the list of tasks for celery chord.""" seconds = kwargs.get("seconds") task_id = kwargs.get("task_id") - return ( - [task_sleep.s(seconds, task_id)], - None, - ) + return [task_sleep.s(seconds, task_id)] @pytest.fixture