Skip to content

Commit

Permalink
Move all content to serial structure
Browse files Browse the repository at this point in the history
  • Loading branch information
esloch committed Jun 28, 2024
1 parent 5a882d9 commit 9fa7dfb
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 125 deletions.
1 change: 1 addition & 0 deletions examples/redis_queue_gathering_serial_task/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Example of usage of retsu."""
128 changes: 3 additions & 125 deletions examples/redis_queue_gathering_serial_task/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,132 +1,10 @@
"""Example of usage retsu with a celery app."""
"""Tasks for the example."""

import logging
import time

from typing import Any

import celery


# from .config import app, redis_client
from celery_app import app # type: ignore
from retsu import TaskManager
from retsu.celery import ParallelCeleryTask, SerialCeleryTask

from .libs.back_cleaning import clean_intermediate_files
from .libs.back_clustering import cluster_preprocessed_corpuses
from .libs.back_plotting import generate_plot
from .libs.back_preparing import prepare_corpuses
from .libs.back_preprocessing import preprocess_prepared_corpuses
from .libs.back_process import back_process_articles

logger = logging.getLogger(__name__)


@app.task
def task_preprocess_prepared_corpuses(research: Any, task_id: str) -> Any:
"""Preprocess corpuses for a given research."""
return preprocess_prepared_corpuses(research)


@app.task
def task_prepare_corpuses(research: Any, task_id: str) -> Any:
"""Prepare corpuses for a given research."""
return prepare_corpuses(research)


@app.task
def task_preprocess_prepared_corpuses(research: Any, task_id: str) -> Any:
"""Preprocess corpuses for a given research."""
return preprocess_prepared_corpuses(research)


@app.task
def task_cluster_preprocessed_corpuses(research: Any, task_id: str) -> Any:
"""Cluster corpuses for a given research."""
return cluster_preprocessed_corpuses(research)


@app.task
def task_clean_intermediate_files(research: Any, task_id: str) -> Any:
"""Clean intermediate files for a given research."""
return clean_intermediate_files(research)


@app.task
def task_generate_plot(research: Any, task_id: str) -> Any:
"""Generate plot for a given research."""
return generate_plot(research)


@app.task
def task_articles(research: Any, task_id: str) -> Any:
"""Extract articles for a given research."""
return back_process_articles(research)


class ArticleTask(SerialCeleryTask):
"""Task to handle articles processing."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [task_articles.s(research, task_id)]


class PreparingTask(SerialCeleryTask):
"""Task to handle corpus preparation."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_prepare_corpuses.s(research, task_id),
]


class PreprocessingTask(SerialCeleryTask):
"""Task to handle corpus preprocessing."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_preprocess_prepared_corpuses.s(research, task_id),
]


class ClusteringTask(SerialCeleryTask):
"""Task to handle corpus clustering."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_cluster_preprocessed_corpuses.s(research, task_id),
]


class PlottingTask(SerialCeleryTask):
"""Task to handle corpus plotting."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_generate_plot.s(research, task_id),
]


class CleaningTask(SerialCeleryTask):
"""Task to handle corpus cleaning."""
# from .parallel import MyParallelTask1
from .serial import ArticleTask, CleaningTask, ClusteringTask, PlottingTask, PreparingTask, PreprocessingTask

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_clean_intermediate_files.s(research, task_id),
]


class MyTaskManager(TaskManager):
Expand Down
4 changes: 4 additions & 0 deletions examples/redis_queue_gathering_serial_task/tasks/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Module for the celery app."""

# from .parallel import * # noqa: F403
from .serial import * # noqa: F403
51 changes: 51 additions & 0 deletions examples/redis_queue_gathering_serial_task/tasks/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Configuration for Celery app."""

import os
import sys

import redis

from celery import Celery

redis_host: str = os.getenv("RETSU_REDIS_HOST", "localhost")
redis_port: int = int(os.getenv("RETSU_REDIS_PORT", 6379))
redis_db: int = int(os.getenv("RETSU_REDIS_DB", 0))

redis_uri = f"redis://{redis_host}:{redis_port}/{redis_db}"

app = Celery(
"app",
broker=redis_uri,
backend=redis_uri,
)

LOG_FORMAT_PREFIX = "[%(asctime)s: %(levelname)s/%(processName)s]"

app.conf.update(
broker_url=redis_uri,
result_backend=redis_uri,
worker_log_format=f"{LOG_FORMAT_PREFIX} %(message)s",
worker_task_log_format=(
f"{LOG_FORMAT_PREFIX} %(task_name)s[%(task_id)s]: %(message)s"
),
task_annotations={"*": {"rate_limit": "10/s"}},
task_track_started=True,
task_time_limit=30 * 60,
task_soft_time_limit=30 * 60,
worker_redirect_stdouts_level="DEBUG",
)

redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
ssl=False,
)

try:
print("Pinging Redis...")
redis_client.ping()
print("Redis connection is working.")
except redis.ConnectionError as e:
print(f"Failed to connect to Redis: {e}")
sys.exit(1)
200 changes: 200 additions & 0 deletions examples/redis_queue_gathering_serial_task/tasks/serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
"""My retsu tasks."""

from __future__ import annotations

from time import sleep

import celery

from retsu.celery import SerialCeleryTask

from .config import app, redis_client


@app.task
def task_serial_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore
"""Define the task_serial_a_plus_b."""
sleep(a + b)
print("running task_serial_a_plus_b")
result = a + b
redis_client.set(f"serial-result-a-plus-b-{task_id}", result)
return result


@app.task
def task_serial_result_plus_10(task_id: str) -> int: # type: ignore
"""Define the task_serial_result_plus_10."""
print("running task_serial_result_plus_10")
previous_result = None
while previous_result is None:
previous_result = redis_client.get(f"serial-result-a-plus-b-{task_id}")
sleep(1)

previous_result_int = int(previous_result)
result = previous_result_int + 10
redis_client.set(f"serial-result-plus-10-{task_id}", result)
return result


@app.task
def task_serial_result_square(results, task_id: str) -> int: # type: ignore
"""Define the task_serial_result_square."""
print("running task_serial_result_square")
previous_result = None
while previous_result is None:
previous_result = redis_client.get(f"serial-result-plus-10-{task_id}")
sleep(1)

previous_result_int = int(previous_result)
result = previous_result_int**2
return result


class MySerialTask1(SerialCeleryTask):
"""MySerialTask1."""

def request(self, a: int, b: int) -> str:
"""Receive the request for processing."""
return super().request(a=a, b=b)

def get_chord_tasks(
self, a: int, b: int, task_id: str
) -> tuple[list[celery.Signature], celery.Signature]:
"""Define the list of tasks for celery chord."""
return (
[
task_serial_a_plus_b.s(a, b, task_id),
task_serial_result_plus_10.s(task_id),
],
task_serial_result_square.s(task_id),
)
"""Example of usage retsu with a celery app."""

import logging
import time

from typing import Any

import celery


# from .config import app, redis_client
from celery_app import app # type: ignore
from retsu import TaskManager
from retsu.celery import ParallelCeleryTask, SerialCeleryTask

from .libs.back_cleaning import clean_intermediate_files
from .libs.back_clustering import cluster_preprocessed_corpuses
from .libs.back_plotting import generate_plot
from .libs.back_preparing import prepare_corpuses
from .libs.back_preprocessing import preprocess_prepared_corpuses
from .libs.back_process import back_process_articles

logger = logging.getLogger(__name__)


@app.task
def task_preprocess_prepared_corpuses(research: Any, task_id: str) -> Any:
"""Preprocess corpuses for a given research."""
return preprocess_prepared_corpuses(research)


@app.task
def task_prepare_corpuses(research: Any, task_id: str) -> Any:
"""Prepare corpuses for a given research."""
return prepare_corpuses(research)


@app.task
def task_preprocess_prepared_corpuses(research: Any, task_id: str) -> Any:
"""Preprocess corpuses for a given research."""
return preprocess_prepared_corpuses(research)


@app.task
def task_cluster_preprocessed_corpuses(research: Any, task_id: str) -> Any:
"""Cluster corpuses for a given research."""
return cluster_preprocessed_corpuses(research)


@app.task
def task_clean_intermediate_files(research: Any, task_id: str) -> Any:
"""Clean intermediate files for a given research."""
return clean_intermediate_files(research)


@app.task
def task_generate_plot(research: Any, task_id: str) -> Any:
"""Generate plot for a given research."""
return generate_plot(research)


@app.task
def task_articles(research: Any, task_id: str) -> Any:
"""Extract articles for a given research."""
return back_process_articles(research)


class ArticleTask(SerialCeleryTask):
"""Task to handle articles processing."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [task_articles.s(research, task_id)]


class PreparingTask(SerialCeleryTask):
"""Task to handle corpus preparation."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_prepare_corpuses.s(research, task_id),
]


class PreprocessingTask(SerialCeleryTask):
"""Task to handle corpus preprocessing."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_preprocess_prepared_corpuses.s(research, task_id),
]


class ClusteringTask(SerialCeleryTask):
"""Task to handle corpus clustering."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_cluster_preprocessed_corpuses.s(research, task_id),
]


class PlottingTask(SerialCeleryTask):
"""Task to handle corpus plotting."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_generate_plot.s(research, task_id),
]


class CleaningTask(SerialCeleryTask):
"""Task to handle corpus cleaning."""

def get_group_tasks(
self, research: Any, task_id: str
) -> list[celery.Signature]:
return [
task_clean_intermediate_files.s(research, task_id),
]

0 comments on commit 9fa7dfb

Please sign in to comment.