Skip to content

Commit

Permalink
fix: Fix get command in the result attribute, and add `get_group_ta…
Browse files Browse the repository at this point in the history
…sks` (#11)
  • Loading branch information
xmnlab authored Jun 20, 2024
1 parent 56f72be commit ff29334
Show file tree
Hide file tree
Showing 21 changed files with 285 additions and 195 deletions.
12 changes: 0 additions & 12 deletions .makim.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,6 @@ groups:
run: |
pytest ${{ args.path }} ${{ args.params }}
example:
help: Run the example app
shell: bash
run: |
set -ex
sugar build
sugar ext restart --options -d
sleep 5
cd example/
celery -A tasks.app worker --loglevel=debug &
python app.py
setup:
help: Run the setup for the unit tests
shell: bash
Expand Down
2 changes: 0 additions & 2 deletions containers/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ services:
valkey:
image: valkey/valkey:7.2.5-alpine
hostname: valkey
container_name: valkey
ports:
- 6379:6379

# celery:
# hostname: celery
# container_name: celery
# build:
# context: ..
# dockerfile: containers/celery/Dockerfile
Expand Down
66 changes: 0 additions & 66 deletions example/tasks/parallel.py

This file was deleted.

66 changes: 0 additions & 66 deletions example/tasks/serial.py

This file was deleted.

8 changes: 8 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Examples of usage of Retsu

This folder contains some examples of usage of Retsu.

## Using redis as a queue between celery tasks

The `redis_queue_between_tasks` folder contains an example about how to create
extra queues to establish communication across different celery chord tasks.
File renamed without changes.
11 changes: 11 additions & 0 deletions examples/redis_queue_between_tasks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# How to test

You can run `run.sh` in your terminal, and in the web browser you can try the
following endpoints:

- http://127.0.0.1:5000/serial/10/20
- http://127.0.0.1:5000/parallel/10/20
- http://127.0.0.1:5000/serial/result/[TASK_ID]
- http://127.0.0.1:5000/parallel/result/[TASK_ID]

Remember to replace `[TASK_ID]` by the desired task id.
File renamed without changes.
58 changes: 44 additions & 14 deletions example/app.py → examples/redis_queue_between_tasks/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import os
import signal

from typing import Optional
from time import sleep
from typing import Any, Optional

from flask import Flask
from tasks import MyTaskManager
Expand Down Expand Up @@ -40,55 +41,84 @@ def api() -> str:
* parallel
* status
* result
"""
Example of endpoints:
- http://127.0.0.1:5000/serial/1/2
- http://127.0.0.1:5000/parallel/1/2
- http://127.0.0.1:5000/serial/result/[TASK_ID]
- http://127.0.0.1:5000/serial/status/[TASK_ID]
- http://127.0.0.1:5000/parallel/result/[TASK_ID]
- http://127.0.0.1:5000/parallel/status/[TASK_ID]
Remember to replace `[TASK_ID]` by the desired task id.
""".replace("\n", "<br/>")

return menu


@app.route("/serial/<int:a>/<int:b>")
def serial(a: int, b: int) -> str:
def serial(a: int, b: int) -> dict[str, Any]:
"""Define the serial endpoint."""
task1 = task_manager.get_task("serial")
key = task1.request(a=a, b=b)
return f"your task ({key}) is running now, please wait until it is done."
return {"message": f"Your task ({key}) is running now"}


@app.route("/parallel/<int:a>/<int:b>")
def parallel(a: int, b: int) -> str:
def parallel(a: int, b: int) -> dict[str, Any]:
"""Define the parallel endpoint."""
task2 = task_manager.get_task("parallel")
key = task2.request(a=a, b=b)
return f"your task ({key}) is running now, please wait until it is done."
return {"message": f"Your task ({key}) is running now"}


@app.route("/serial/status/<string:task_id>")
def serial_status(task_id: str) -> str:
def serial_status(task_id: str) -> dict[str, Any]:
"""Define serial/status endpoint."""
task1 = task_manager.get_task("serial")
_status = task1.status(task_id)
_status = task1.result.status(task_id)
return {"status": _status, "task_id": task_id}


@app.route("/parallel/status/<string:task_id>")
def parallel_status(task_id: str) -> str:
def parallel_status(task_id: str) -> dict[str, Any]:
"""Define parallel/status endpoint."""
task2 = task_manager.get_task("parallel")
_status = task2.status(task_id)
_status = task2.result.status(task_id)
return {"status": _status, "task_id": task_id}


@app.route("/serial/result/<string:task_id>")
def serial_result(task_id: str) -> str:
def serial_result(task_id: str) -> dict[str, Any]:
"""Define serial/result endpoint."""
task1 = task_manager.get_task("serial")
return task1.get_result(task_id)
result = None
for _ in range(10):
try:
# note: with no timeout
result = task1.result.get(task_id)
break
except Exception:
sleep(1)

if result is None:
return {"Error": "Result is not ready yet."}
return {"result": result[0]}


@app.route("/parallel/result/<string:task_id>")
def parallel_result(task_id: str) -> str:
def parallel_result(task_id: str) -> dict[str, Any]:
"""Define parallel/result endpoint."""
task2 = task_manager.get_task("parallel")
return task2.get_result(task_id)

try:
# note: with timeout
result = task2.result.get(task_id, timeout=10)
except Exception:
return {"Error": "Result is not ready yet."}

return {"result": result[-1]}


if __name__ == "__main__":
Expand Down
14 changes: 14 additions & 0 deletions examples/redis_queue_between_tasks/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

set -ex

pushd ../../

sugar build
sugar ext restart --options -d
sleep 5

popd

celery -A tasks.app worker --loglevel=debug &
python app.py
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
66 changes: 66 additions & 0 deletions examples/redis_queue_between_tasks/tasks/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""My retsu tasks."""

from __future__ import annotations

from time import sleep

import celery

from retsu.celery import ParallelCeleryTask

from .config import app, redis_client


@app.task
def task_parallel_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore
"""Define the task_parallel_a_plus_b."""
sleep(a + b)
print("running task_parallel_a_plus_b")
result = a + b
redis_client.set(f"parallel-result-a-plus-b-{task_id}", result)
return result


@app.task
def task_parallel_result_plus_10(task_id: str) -> int: # type: ignore
"""Define the task_parallel_result_plus_10."""
print("running task_parallel_result_plus_10")
result = None
while result is None:
result = redis_client.get(f"parallel-result-a-plus-b-{task_id}")
sleep(1)

final_result = int(result) + 10
redis_client.set(f"parallel-result-plus-10-{task_id}", final_result)
return final_result


@app.task
def task_parallel_result_square(results, task_id: str) -> int: # type: ignore
"""Define the task_parallel_result_square."""
print("running task_parallel_result_square")
result = None
while result is None:
result = redis_client.get(f"parallel-result-plus-10-{task_id}")
sleep(1)
return int(result) ** 2


class MyParallelTask1(ParallelCeleryTask):
"""MyParallelTask1."""

def request(self, a: int, b: int) -> str:
"""Receive the request for processing."""
return super().request(a=a, b=b)

def get_chord_tasks(
self, a: int, b: int, task_id: str
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
return (
[
task_parallel_a_plus_b.s(a, b, task_id),
task_parallel_result_plus_10.s(task_id),
],
task_parallel_result_square.s(task_id),
)
Loading

0 comments on commit ff29334

Please sign in to comment.