Skip to content

Commit

Permalink
Add arq queue
Browse files Browse the repository at this point in the history
Add redis and safir dependencies to the project

Add redis to tests

Co-authored-by: Jonathan Sick <[email protected]>
  • Loading branch information
Fireye04 and jonathansick committed Jun 15, 2023
1 parent 97fe2ee commit bfbece6
Show file tree
Hide file tree
Showing 14 changed files with 375 additions and 525 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ jobs:
tox-${{ matrix.python }}-${{ hashFiles('requirements/*.txt') }}-
- name: Run tox
run: tox -e py,coverage-report,typing
uses: lsst-sqre/run-tox@v1
with:
python-version: ${{ matrix.python }}
tox-envs: 'lint,typing,py,coverage-report'
tox-plugins: "tox-docker"

build:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion requirements/dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ types-PyYAML

# Documentation
documenteer[guide]==1.0.0a1
sphinx-click
sphinx-click
460 changes: 18 additions & 442 deletions requirements/dev.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ arrow
click
httpx
python-dateutil
safir>=2.0
safir[arq,redis]>=4.0
structlog
pydantic
markdown-it-py[linkify,plugins]
Expand Down
233 changes: 155 additions & 78 deletions requirements/main.txt

Large diffs are not rendered by default.

39 changes: 38 additions & 1 deletion src/semaphore/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@

from enum import Enum
from typing import Any, Mapping, Optional
from urllib.parse import urlparse

from pydantic import BaseSettings, Field, SecretStr, validator
from arq.connections import RedisSettings
from pydantic import BaseSettings, Field, RedisDsn, SecretStr, validator
from safir.arq import ArqMode

# from safir.logging import configure_logging

Expand Down Expand Up @@ -84,6 +87,29 @@ class Config(BaseSettings):
For a list of environments, see https://github.com/lsst-sqre/phalanx.
"""

slack_webhook_url: Optional[SecretStr] = Field(
None, env="SEMAPHORE_SLACK_PRIVATE_KEY"
)
"""The Slack app private key."""

redis_url: RedisDsn = Field(
env="SEMAPHORE_REDIS_URL",
default_factory=lambda: RedisDsn(
"redis://localhost:6379/0", scheme="redis"
),
)
"""URL for the redis instance, used by the worker queue."""

redis_queue_url: RedisDsn = Field(
env="SEMAPHORE_REDIS_URL",
default_factory=lambda: RedisDsn(
"redis://localhost:6379/0", scheme="redis"
),
)
"""URL for the redis instance, used by the arq queue."""

arq_mode: ArqMode = Field(ArqMode.production, env="SEMAPHORE_ARQ_MODE")

@validator("github_webhook_secret", "github_app_private_key", pre=True)
def validate_none_secret(
cls, v: Optional[SecretStr]
Expand Down Expand Up @@ -123,5 +149,16 @@ def validate_github_app(cls, v: bool, values: Mapping[str, Any]) -> bool:

return True

@property
def arq_redis_settings(self) -> RedisSettings:
"""Create a Redis settings instance for arq."""
url_parts = urlparse(self.redis_queue_url)
redis_settings = RedisSettings(
host=url_parts.hostname or "localhost",
port=url_parts.port or 6379,
database=int(url_parts.path.lstrip("/")) if url_parts.path else 0,
)
return redis_settings


config = Config()
44 changes: 44 additions & 0 deletions src/semaphore/dependencies/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Redis dependency for FastAPI."""

from typing import Optional

from redis.asyncio import Redis

__all__ = ["RedisDependency", "redis_dependency"]


class RedisDependency:
"""Provides an asyncio-based Redis client as a dependency.
Notes
-----
This dependency must be initialized in a start-up hook (`initialize`) and
closed in a shut down hook (`close`).
"""

def __init__(self) -> None:
self.redis: Redis | None = None

async def initialize(
self, redis_url: str, password: Optional[str] = None
) -> None:
self.redis = Redis.from_url(redis_url, password=password)

async def __call__(self) -> Redis:
"""Returns the redis pool."""
if self.redis is None:
raise RuntimeError("RedisDependency is not initialized")
return self.redis

async def close(self) -> None:
"""Close the open Redis pool.
Should be called from a shutdown hook to ensure that the Redis clients
are cleanly shut down and any pending writes are complete.
"""
if self.redis:
await self.redis.close()
await self.redis.connection_pool.disconnect()
self.redis = None


redis_dependency = RedisDependency()
"""The dependency that will return the Redis pool."""
8 changes: 8 additions & 0 deletions src/semaphore/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.utils import get_openapi
from safir.dependencies.arq import arq_dependency
from safir.dependencies.http_client import http_client_dependency
from safir.logging import configure_logging
from safir.middleware.x_forwarded import XForwardedMiddleware

from .config import config
from .dependencies.broadcastrepo import broadcast_repo_dependency
from .dependencies.redis import redis_dependency
from .github.broadcastservices import bootstrap_broadcast_repo
from .handlers.external import external_router
from .handlers.internal import internal_router
Expand Down Expand Up @@ -64,6 +66,11 @@ async def startup_event() -> None:
logger = structlog.get_logger(config.logger_name)
logger.info("Running startup")

await redis_dependency.initialize(config.redis_url)
await arq_dependency.initialize(
mode=config.arq_mode, redis_settings=config.arq_redis_settings
)

broadcast_repo = await broadcast_repo_dependency()
if config.enable_github_app:
await bootstrap_broadcast_repo(
Expand All @@ -76,6 +83,7 @@ async def startup_event() -> None:
@app.on_event("shutdown")
async def shutdown_event() -> None:
await http_client_dependency.aclose()
await redis_dependency.close()


def create_openapi() -> str:
Expand Down
Empty file.
5 changes: 5 additions & 0 deletions src/semaphore/worker/functions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .ping import ping

__all__ = [
"ping",
]
11 changes: 11 additions & 0 deletions src/semaphore/worker/functions/ping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""A proof-of-concept worker function."""

from __future__ import annotations

from typing import Any, Dict


async def ping(ctx: Dict[Any, Any]) -> str:
logger = ctx["logger"].bind(task="ping")
logger.info("Running ping")
return "pong"
Empty file.
71 changes: 71 additions & 0 deletions src/semaphore/worker/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Arq-based queue worker lifecycle configuration."""

from __future__ import annotations

import uuid
from typing import Any, Dict

import httpx
import structlog
from safir.logging import configure_logging

from semaphore.config import config
from semaphore.dependencies.redis import redis_dependency

from .functions import ping


async def startup(ctx: Dict[Any, Any]) -> None:
"""Runs during working start-up to set up the worker context."""
configure_logging(
profile=config.profile,
log_level=config.log_level,
name="semaphore",
)
logger = structlog.get_logger("semaphore")
# The instance key uniquely identifies this worker in logs
instance_key = uuid.uuid4().hex
logger = logger.bind(worker_instance=instance_key)

logger.info("Starting up worker")

http_client = httpx.AsyncClient()
ctx["http_client"] = http_client

ctx["logger"] = logger
logger.info("Start up complete")
await redis_dependency.initialize(config.redis_url)


async def shutdown(ctx: Dict[Any, Any]) -> None:
"""Runs during worker shut-down to resources."""
if "logger" in ctx.keys():
logger = ctx["logger"]
else:
logger = structlog.get_logger(__name__)
logger.info("Running worker shutdown.")

await redis_dependency.close()

try:
await ctx["http_client"].aclose()
except Exception as e:
logger.warning("Issue closing the http_client: %s", str(e))

logger.info("Worker shutdown complete.")


class WorkerSettings:
"""Configuration for a Times Square arq worker.
See `arq.worker.Worker` for details on these attributes.
"""

functions = [
ping,
]

redis_settings = config.arq_redis_settings

on_startup = startup

on_shutdown = shutdown
19 changes: 18 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,19 @@
envlist = py,typing,lint,docs,coverage-report
isolated_build = True

[docker:redis]
image = redis:latest
ports =
6379:6379/tcp
healthcheck_cmd =
redis-cli ping
healthcheck_timeout = 1
healthcheck_retries = 30
healthcheck_interval = 1
healthcheck_start_period = 1

[testenv]
description = Run pytest.
description = Environment with test dependencies.
deps =
-r{toxinidir}/requirements/main.txt
-r{toxinidir}/requirements/dev.txt
Expand All @@ -15,6 +26,12 @@ setenv =
commands =
pytest -vv --cov=semaphore --cov-branch --cov-report= -n auto {posargs}

[testenv:docker]
description = Run pytest with PostgreSQL via Docker.
docker =
postgres
redis

[testenv:coverage-report]
description = Compile coverage from each test run.
skip_install = true
Expand Down

0 comments on commit bfbece6

Please sign in to comment.