Skip to content

Commit

Permalink
feat: explicit final message commit status (#1754)
Browse files Browse the repository at this point in the history
* feat: explicit final message commit status

* fix: use old StrEnum

* chore: little polish tests CI

* chore: little polish tests CI

* run ci

* run ci

* run ci

---------

Co-authored-by: Kumaran Rajendhiran <[email protected]>
  • Loading branch information
Lancetnik and kumaranvpl authored Sep 3, 2024
1 parent 09c3ca9 commit 4fe975d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 24 deletions.
65 changes: 51 additions & 14 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ on:
types:
- checks_requested

env:
ALL_PYTEST_MARKERS: "not nats and not kafka and not confluent and not rabbit and not redis"

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
Expand Down Expand Up @@ -72,10 +75,13 @@ jobs:
run: uv pip install --system --pre "pydantic>=2.0.0b2,<3.0.0"
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -vv -m "(slow and (not nats and not kafka and not confluent and not rabbit and not redis)) or (not nats and not kafka and not confluent and not rabbit and not redis)"
run: >
bash scripts/test.sh -vv
-m "(slow and ($ALL_PYTEST_MARKERS)) or ($ALL_PYTEST_MARKERS)"
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
- run: ls coverage
- name: Store coverage files
uses: actions/upload-artifact@v4
with:
Expand All @@ -100,7 +106,9 @@ jobs:
uv pip install --system .[optionals,testing] orjson
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m"(slow and (not nats and not kafka and not confluent and not rabbit and not redis)) or (not nats and not kafka and not confluent and not rabbit and not redis)"
run: >
bash scripts/test.sh
-m "(slow and ($ALL_PYTEST_MARKERS)) or ($ALL_PYTEST_MARKERS)"
env:
COVERAGE_FILE: coverage/.coverage.orjson
CONTEXT: orjson
Expand All @@ -127,7 +135,9 @@ jobs:
python -m pip install uv
uv pip install --system .[optionals,testing]
- name: Test
run: bash scripts/test.sh -m "(slow and (not nats and not kafka and not confluent and not rabbit and not redis)) or (not nats and not kafka and not confluent and not rabbit and not redis)"
run: |
bash scripts/test.sh
-m "(slow and ($ALL_PYTEST_MARKERS)) or ($ALL_PYTEST_MARKERS)"
test-windows-latest:
if: github.event.pull_request.draft == false
Expand All @@ -144,7 +154,9 @@ jobs:
python -m pip install uv
uv pip install --system .[optionals,testing]
- name: Test
run: bash scripts/test.sh -m "(slow and (not nats and not kafka and not confluent and not rabbit and not redis)) or (not nats and not kafka and not confluent and not rabbit and not redis)"
run: >
bash scripts/test.sh
-m "(slow and ($ALL_PYTEST_MARKERS)) or ($ALL_PYTEST_MARKERS)"
test-kafka-smoke:
if: github.event.pull_request.draft == false
Expand All @@ -161,7 +173,10 @@ jobs:
python -m pip install uv
uv pip install --system .[kafka,test-core]
- name: Test
run: bash scripts/test.sh -m "not kafka" tests/brokers/kafka/test_test_client.py
run: >
bash scripts/test.sh
-m "not kafka"
tests/brokers/kafka/test_test_client.py
test-kafka-real:
if: github.event.pull_request.draft == false
Expand Down Expand Up @@ -198,7 +213,9 @@ jobs:
uv pip install --system .[optionals,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and kafka) or kafka"
run: >
bash scripts/test.sh
-m "(slow and kafka) or kafka"
env:
COVERAGE_FILE: coverage/.coverage.kafka-py
CONTEXT: kafka-py
Expand All @@ -225,7 +242,10 @@ jobs:
python -m pip install uv
uv pip install --system .[confluent,test-core]
- name: Test
run: bash scripts/test.sh -m "not confluent" tests/brokers/confluent/test_test_client.py
run: >
bash scripts/test.sh
-m "not confluent"
tests/brokers/confluent/test_test_client.py
test-confluent-real:
if: github.event.pull_request.draft == false
Expand Down Expand Up @@ -262,7 +282,9 @@ jobs:
uv pip install --system .[optionals,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -vv -m "(slow and confluent) or confluent"
run: >
bash scripts/test.sh -vv
-m "(slow and confluent) or confluent"
env:
COVERAGE_FILE: coverage/.coverage.confluent-py
CONTEXT: confluent-py
Expand All @@ -289,7 +311,10 @@ jobs:
python -m pip install uv
uv pip install --system .[rabbit,test-core]
- name: Test
run: bash scripts/test.sh -m "not rabbit" tests/brokers/rabbit/test_test_client.py
run: >
bash scripts/test.sh
-m "not rabbit"
tests/brokers/rabbit/test_test_client.py
test-rabbit-real:
if: github.event.pull_request.draft == false
Expand All @@ -315,7 +340,9 @@ jobs:
uv pip install --system .[optionals,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and rabbit) or rabbit"
run: >
bash scripts/test.sh
-m "(slow and rabbit) or rabbit"
env:
COVERAGE_FILE: coverage/.coverage.rabbit-py
CONTEXT: rabbit-py
Expand All @@ -342,7 +369,10 @@ jobs:
python -m pip install uv
uv pip install --system .[nats,test-core]
- name: Test
run: bash scripts/test.sh -m "not nats" tests/brokers/nats/test_test_client.py
run: >
bash scripts/test.sh
-m "not nats"
tests/brokers/nats/test_test_client.py
test-nats-real:
if: github.event.pull_request.draft == false
Expand All @@ -368,7 +398,9 @@ jobs:
uv pip install --system .[optionals,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and nats) or nats"
run: >
bash scripts/test.sh
-m "(slow and nats) or nats"
env:
COVERAGE_FILE: coverage/.coverage.nats-py
CONTEXT: nats-py
Expand All @@ -395,7 +427,10 @@ jobs:
python -m pip install uv
uv pip install --system .[redis,test-core]
- name: Test
run: bash scripts/test.sh -m "not redis" tests/brokers/redis/test_test_client.py
run: >
bash scripts/test.sh
-m "not redis"
tests/brokers/redis/test_test_client.py
test-redis-real:
if: github.event.pull_request.draft == false
Expand All @@ -421,7 +456,9 @@ jobs:
uv pip install --system .[optionals,testing]
- run: mkdir coverage
- name: Test
run: bash scripts/test.sh -m "(slow and redis) or redis"
run: >
bash scripts/test.sh
-m "(slow and redis) or redis"
env:
COVERAGE_FILE: coverage/.coverage.redis-py
CONTEXT: redis-py
Expand Down
27 changes: 17 additions & 10 deletions faststream/broker/message.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from contextlib import suppress
from dataclasses import dataclass, field
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -28,6 +29,12 @@
MsgType = TypeVar("MsgType")


class AckStatus(str, Enum):
acked = "acked"
nacked = "nacked"
rejected = "rejected"


def gen_cor_id() -> str:
"""Generate random string to use as ID."""
return str(uuid4())
Expand All @@ -52,9 +59,18 @@ class StreamMessage(Generic[MsgType]):
)

processed: bool = field(default=False, init=False)
committed: bool = field(default=False, init=False)
committed: Optional[AckStatus] = field(default=None, init=False)
_decoded_body: Optional["DecodedMessage"] = field(default=None, init=False)

async def ack(self) -> None:
self.committed = AckStatus.acked

async def nack(self) -> None:
self.committed = AckStatus.nacked

async def reject(self) -> None:
self.committed = AckStatus.rejected

async def decode(self) -> Optional["DecodedMessage"]:
"""Serialize the message by lazy decoder."""
# TODO: make it lazy after `decoded_body` removed
Expand Down Expand Up @@ -82,15 +98,6 @@ def decoded_body(self) -> Optional["DecodedMessage"]:
def decoded_body(self, value: Optional["DecodedMessage"]) -> None:
self._decoded_body = value

async def ack(self) -> None:
self.committed = True

async def nack(self) -> None:
self.committed = True

async def reject(self) -> None:
self.committed = True


def decode_message(message: "StreamMessage[Any]") -> "DecodedMessage":
"""Decodes a message."""
Expand Down

0 comments on commit 4fe975d

Please sign in to comment.