Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
pencil committed Dec 12, 2024
0 parents commit 1e9f970
Show file tree
Hide file tree
Showing 26 changed files with 980 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv/
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 Parnassus Labs, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.PHONY: test
test:
pytest dramatiq_workflow
96 changes: 96 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# dramatiq-workflow

`dramatiq-workflow` allows running workflows (chains and groups of tasks) using
the Python background task processing library [dramatiq](https://dramatiq.io/).

A workflow allows running tasks in parallel and in sequence. It is a way to
define a workflow of tasks, a combination of chains and groups in any order and
nested as needed.

## Features

- Define workflows with tasks running in parallel and in sequence.
- Nest chains and groups of tasks to create complex workflows.
- Schedules workflows to run in the background using dramatiq.

**Note:** `dramatiq-workflow` does not support passing the results from one task
to the next one in a chain. We recommend using a database to store intermediate
results if needed.

## Installation

You can install `dramatiq-workflow` from PyPI:

```sh
pip install dramatiq-workflow
```

## Example

Let's assume we want a workflow that looks like this:

```text
╭────────╮ ╭────────╮
│ Task 2 │ │ Task 5 │
╭──┼● ●┼──┼● ●┼╮
╭────────╮│ ╰────────╯ ╰────────╯│ ╭────────╮
│ Task 1 ││ ╭────────╮ │ │ Task 8 │
│ ●┼╯ │ Task 3 │ ╰──┼● │
│ ●┼───┼● ●┼───────────────┼● │
│ ●┼╮ ╰────────╯ ╭─┼● │
╰────────╯│ ╭────────╮ ╭────────╮│╭┼● │
│ │ Task 4 │ │ Task 6 │││╰────────╯
╰──┼● ●┼───┼● ●┼╯│
│ ●┼╮ ╰────────╯ │
╰────────╯│ │
│ ╭────────╮ │
│ │ Task 7 │ │
╰──┼● ●┼─╯
╰────────╯
```

We can define this workflow as follows:

```python
from dramatiq_workflow import Workflow, Chain, Group

workflow = Workflow(
Chain(
task1.message(),
Group(
Chain(
task2.message(),
task5.message(),
),
task3.message(),
Chain(
task4.message(),
Group(
task6.message(),
task7.message(),
),
),
),
task8.message(),
),
)
workflow.run() # Schedules the workflow to run in the background
```

### Execution Order

In this example, the execution would look like this:

1. Task 1 runs
2. Task 2, 3, and 4 run in parallel once Task 1 finishes
3. Task 5 runs once Task 2 finishes
4. Task 6 and 7 run in parallel once Task 4 finishes
5. Task 8 runs once Task 5, 6, and 7 finish

*This is a simplified example. The actual execution order may vary because
tasks that can run in parallel (i.e., in a Group) are not guaranteed to run in
the order they are defined in the workflow.*

## License

This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.
13 changes: 13 additions & 0 deletions dramatiq_workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from ._base import Workflow
from ._middleware import WorkflowMiddleware
from ._models import Chain, Group, Message, WithDelay, WorkflowType

__all__ = [
"Chain",
"Group",
"Message",
"WithDelay",
"Workflow",
"WorkflowMiddleware",
"WorkflowType",
]
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
38 changes: 38 additions & 0 deletions dramatiq_workflow/_barrier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import dramatiq.rate_limits


class AtMostOnceBarrier(dramatiq.rate_limits.Barrier):
"""
The AtMostOnceBarrier is a barrier that ensures that it is released at most
once.
We use this because we want to avoid running callbacks in chains multiple
times. Running callbacks more than once can have compounding effects
especially when groups are involved.
The downside of this is that we cannot guarantee that the barrier will be
released at all. Theoretically a worker could die after releasing the
barrier but just before it has a chance to schedule the callbacks.
"""

def __init__(self, backend, key, *args, ttl=900000):
super().__init__(backend, key, *args, ttl=ttl)
self.ran_key = f"{key}_ran"

def create(self, parties):
self.backend.add(self.ran_key, 0, self.ttl)
return super().create(parties)

def wait(self, *args, block=True, timeout=None):
if block:
# Blocking with an AtMostOnceBarrier is not supported as it could
# lead to clients waiting indefinitely if the barrier already
# released.
raise ValueError("Blocking is not supported by AtMostOnceBarrier")

released = super().wait(*args, block=False)
if released:
never_released = self.backend.incr(self.ran_key, 1, 1, self.ttl)
return never_released

return False
185 changes: 185 additions & 0 deletions dramatiq_workflow/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import logging
import time
from uuid import uuid4

import dramatiq
import dramatiq.rate_limits

from ._constants import CALLBACK_BARRIER_TTL, OPTION_KEY_CALLBACKS
from ._helpers import workflow_with_completion_callbacks
from ._middleware import WorkflowMiddleware, workflow_noop
from ._models import Barrier, Chain, CompletionCallbacks, Group, Message, WithDelay, WorkflowType
from ._serialize import serialize_callbacks, serialize_workflow

logger = logging.getLogger(__name__)


class Workflow:
"""
A workflow allows running tasks in parallel and in sequence. It is a way to
define a workflow of tasks, a combination of chains and groups in any
order and nested as needed.
Example:
Let's assume we want a workflow that looks like this:
╭────────╮ ╭────────╮
│ Task 2 │ │ Task 5 │
╭──┼● ●┼──┼● ●┼╮
╭────────╮│ ╰────────╯ ╰────────╯│ ╭────────╮
│ Task 1 ││ ╭────────╮ │ │ Task 8 │
│ ●┼╯ │ Task 3 │ ╰──┼● │
│ ●┼───┼● ●┼───────────────┼● │
│ ●┼╮ ╰────────╯ ╭─┼● │
╰────────╯│ ╭────────╮ ╭────────╮│╭┼● │
│ │ Task 4 │ │ Task 6 │││╰────────╯
╰──┼● ●┼───┼● ●┼╯│
│ ●┼╮ ╰────────╯ │
╰────────╯│ │
│ ╭────────╮ │
│ │ Task 7 │ │
╰──┼● ●┼─╯
╰────────╯
We can define this workflow as follows:
```python
from dramatiq_workflow import Workflow, Chain, Group
workflow = Workflow(
Chain(
task1.message(),
Group(
Chain(
task2.message(),
task5.message(),
),
task3.message(),
Chain(
task4.message(),
Group(
task6.message(),
task7.message(),
),
),
),
task8.message(),
),
)
workflow.run() # Schedules the workflow to run in the background
```
In this example, the execution would look like this*:
1. Task 1 runs
2. Task 2, 3, and 4 run in parallel once Task 1 finishes
3. Task 5 runs once Task 2 finishes
4. Task 6 and 7 run in parallel once Task 4 finishes
5. Task 8 runs once Task 5, 6, and 7 finish
* This is a simplified example. The actual execution order may vary because
tasks that can run in parallel (i.e. in a Group) are not guaranteed to run
in the order they are defined in the workflow.
"""

def __init__(
self,
workflow: WorkflowType,
broker: dramatiq.Broker | None = None,
):
self.workflow = workflow
self.broker = broker or dramatiq.get_broker()

self._delay = None
self._completion_callbacks = []

while isinstance(self.workflow, WithDelay):
self._delay = (self._delay or 0) + self.workflow.delay
self.workflow = self.workflow.task

def run(self):
current = self.workflow
completion_callbacks = self._completion_callbacks.copy()

if isinstance(current, Message):
current = self.__augment_message(current, completion_callbacks)
self.broker.enqueue(current, delay=self._delay)
return

if isinstance(current, Chain):
tasks = current.tasks[:]
if not tasks:
self.__schedule_noop(completion_callbacks)
return

task = tasks.pop(0)
if tasks:
completion_id = self.__create_barrier(1)
completion_callbacks.append((completion_id, Chain(*tasks), False))
self.__workflow_with_completion_callbacks(task, completion_callbacks).run()
return

if isinstance(current, Group):
tasks = current.tasks[:]
if not tasks:
self.__schedule_noop(completion_callbacks)
return

completion_id = self.__create_barrier(len(tasks))
completion_callbacks.append((completion_id, None, True))
for task in tasks:
self.__workflow_with_completion_callbacks(task, completion_callbacks).run()
return

raise TypeError(f"Unsupported workflow type: {type(current)}")

def __workflow_with_completion_callbacks(self, task, completion_callbacks) -> "Workflow":
return workflow_with_completion_callbacks(
task,
self.broker,
completion_callbacks,
delay=self._delay,
)

def __schedule_noop(self, completion_callbacks: CompletionCallbacks):
noop_message = workflow_noop.message()
noop_message = self.__augment_message(noop_message, completion_callbacks)
self.broker.enqueue(noop_message, delay=self._delay)

def __augment_message(self, message: Message, completion_callbacks: CompletionCallbacks) -> Message:
return message.copy(
# We reset the message timestamp to better represent the time the
# message was actually enqueued. This is to avoid tripping the max_age
# check in the broker.
message_timestamp=time.time() * 1000,
options={OPTION_KEY_CALLBACKS: serialize_callbacks(completion_callbacks)},
)

@property
def __rate_limiter_backend(self):
if not hasattr(self, "__cached_rate_limiter_backend"):
for middleware in self.broker.middleware:
if isinstance(middleware, WorkflowMiddleware):
self.__cached_rate_limiter_backend = middleware.rate_limiter_backend
break
else:
raise RuntimeError(
"WorkflowMiddleware middleware not found! Did you forget "
"to set it up? It is required if you want to use "
"workflows."
)
return self.__cached_rate_limiter_backend

def __create_barrier(self, count: int):
if count == 1:
# No need to create a distributed barrier if there is only one task
return None

completion_uuid = str(uuid4())
completion_barrier = Barrier(self.__rate_limiter_backend, completion_uuid, ttl=CALLBACK_BARRIER_TTL)
completion_barrier.create(count)
logger.debug("Barrier created: %s (%d tasks)", completion_uuid, count)
return completion_uuid

def __str__(self):
return f"Workflow({serialize_workflow(self.workflow)})"
2 changes: 2 additions & 0 deletions dramatiq_workflow/_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CALLBACK_BARRIER_TTL = 86_400_000
OPTION_KEY_CALLBACKS = "workflow_completion_callbacks"
18 changes: 18 additions & 0 deletions dramatiq_workflow/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import dramatiq

from ._models import CompletionCallbacks, WorkflowType


def workflow_with_completion_callbacks(
workflow: WorkflowType,
broker: dramatiq.Broker,
completion_callbacks: CompletionCallbacks,
delay: int | None = None,
):
from ._base import Workflow

w = Workflow(workflow, broker)
w._completion_callbacks = completion_callbacks
if delay is not None:
w._delay = (w._delay or 0) + delay
return w
Loading

0 comments on commit 1e9f970

Please sign in to comment.