Skip to content

Commit

Permalink
feat(queries): Use analytics limited queue for cache warming tasks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
webjunkie authored Jul 18, 2024
1 parent 937625d commit 3da4fed
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 10 deletions.
2 changes: 1 addition & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Important: Add new queues to make Celery consume tasks from them.

# NOTE: Keep in sync with posthog/tasks/utils.py
CELERY_WORKER_QUEUES=celery,stats,email,analytics_queries,long_running,exports,subscription_delivery,usage_reports,session_replay_embeddings,session_replay_general,session_replay_persistence
CELERY_WORKER_QUEUES=celery,stats,email,analytics_queries,analytics_limited,long_running,exports,subscription_delivery,usage_reports,session_replay_embeddings,session_replay_general,session_replay_persistence
15 changes: 6 additions & 9 deletions posthog/caching/warming.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from posthog.hogql_queries.legacy_compatibility.flagged_conversion_manager import conversion_to_query_based
from posthog.hogql_queries.query_runner import ExecutionMode
from posthog.models import Team, Insight, DashboardTile
from posthog.schema import GenericCachedQueryResponse
from posthog.tasks.utils import CeleryQueue

logger = structlog.get_logger(__name__)
Expand Down Expand Up @@ -80,25 +79,23 @@ def priority_insights(team: Team) -> Generator[tuple[int, Optional[int]], None,
yield from dashboard_tiles


@shared_task(ignore_result=True, expires=60 * 60)
@shared_task(ignore_result=True, expires=60 * 15)
def schedule_warming_for_teams_task():
team_ids = largest_teams(limit=3)
team_ids = largest_teams(limit=10)

teams = Team.objects.filter(Q(pk__in=team_ids) | Q(extra_settings__insights_cache_warming=True))

logger.info("Warming insight cache: teams", team_ids=[team.pk for team in teams])

# TODO: Needs additional thoughts about concurrency and rate limiting if we launch chains for a lot of teams at once

for team in teams:
insight_tuples = priority_insights(team)

task_groups = chain(*(warm_insight_cache_task.si(*insight_tuple) for insight_tuple in insight_tuples))
task_groups.apply_async()
# We chain the task execution to prevent queries *for a single team* running at the same time
chain(*(warm_insight_cache_task.si(*insight_tuple) for insight_tuple in insight_tuples))()


@shared_task(
queue=CeleryQueue.LONG_RUNNING.value,
queue=CeleryQueue.ANALYTICS_LIMITED.value, # Important! Prevents Clickhouse from being overwhelmed
ignore_result=True,
expires=60 * 60,
autoretry_for=(CHQueryErrorTooManySimultaneousQueries,),
Expand Down Expand Up @@ -132,7 +129,7 @@ def warm_insight_cache_task(insight_id: int, dashboard_id: int):
PRIORITY_INSIGHTS_COUNTER.labels(
team_id=insight.team_id,
dashboard=dashboard_id is not None,
is_cached=results.is_cached if isinstance(results, GenericCachedQueryResponse) else False,
is_cached=getattr(results, "is_cached", False),
).inc()
except CHQueryErrorTooManySimultaneousQueries:
raise
Expand Down
1 change: 1 addition & 0 deletions posthog/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class CeleryQueue(Enum):
EMAIL = "email"
LONG_RUNNING = "long_running" # any task that has a good chance of taking more than a few seconds should go here
ANALYTICS_QUERIES = "analytics_queries"
ANALYTICS_LIMITED = "analytics_limited"
EXPORTS = "exports"
SUBSCRIPTION_DELIVERY = "subscription_delivery"
USAGE_REPORTS = "usage_reports"
Expand Down

0 comments on commit 3da4fed

Please sign in to comment.