Skip to content

Commit

Permalink
move around ph_client
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE committed Nov 7, 2023
1 parent 2dc3d20 commit 18de41f
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 43 deletions.
43 changes: 11 additions & 32 deletions posthog/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
from posthog.cloud_utils import is_cloud
from posthog.metrics import pushed_metrics_registry
from posthog.redis import get_client
from posthog.utils import get_crontab, get_instance_region
from posthog.utils import get_crontab
from posthog.ph_client import get_ph_client

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "posthog.settings")
Expand Down Expand Up @@ -333,6 +334,13 @@ def setup_periodic_tasks(sender: Celery, **kwargs):
name="sync datawarehouse sources that have settled in s3 bucket",
)

# Every 2 hours try to retrieve and calculate total rows synced in period
sender.add_periodic_task(
crontab(hour="*/2"),
calculate_external_data_rows_synced.s(),
name="calculate external data rows synced",
)


# Set up clickhouse query instrumentation
@task_prerun.connect
Expand Down Expand Up @@ -900,38 +908,13 @@ def debug_task(self):
print(f"Request: {self.request!r}")


def _get_ph_client():
from posthoganalytics import Posthog

if not is_cloud():
return

# send EU data to EU, US data to US
api_key = None
host = None
region = get_instance_region()
if region == "EU":
api_key = "phc_dZ4GK1LRjhB97XozMSkEwPXx7OVANaJEwLErkY1phUF"
host = "https://eu.posthog.com"
elif region == "US":
api_key = "sTMFPsFhdP1Ssg"
host = "https://app.posthog.com"

if not api_key:
return

ph_client = Posthog(api_key, host=host)

return ph_client


@app.task(ignore_result=True)
def calculate_decide_usage() -> None:
from django.db.models import Q
from posthog.models import Team
from posthog.models.feature_flag.flag_analytics import capture_team_decide_usage

ph_client = _get_ph_client()
ph_client = get_ph_client()

for team in Team.objects.select_related("organization").exclude(
Q(organization__for_internal_metrics=True) | Q(is_demo=True)
Expand All @@ -947,14 +930,10 @@ def calculate_external_data_rows_synced() -> None:
from posthog.models import Team
from posthog.tasks.warehouse import calculate_workspace_rows_synced_by_team

ph_client = _get_ph_client()

for team in Team.objects.select_related("organization").exclude(
Q(organization__for_internal_metrics=True) | Q(is_demo=True) | Q(external_data_workspace_id__isnull=True)
):
calculate_workspace_rows_synced_by_team(ph_client, team.pk)

ph_client.shutdown()
calculate_workspace_rows_synced_by_team.delay(team.pk)


@app.task(ignore_result=True)
Expand Down
27 changes: 27 additions & 0 deletions posthog/ph_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from posthog.utils import get_instance_region
from posthog.cloud_utils import is_cloud


def get_ph_client():
from posthoganalytics import Posthog

if not is_cloud():
return

# send EU data to EU, US data to US
api_key = None
host = None
region = get_instance_region()
if region == "EU":
api_key = "phc_dZ4GK1LRjhB97XozMSkEwPXx7OVANaJEwLErkY1phUF"
host = "https://eu.posthog.com"
elif region == "US":
api_key = "sTMFPsFhdP1Ssg"
host = "https://app.posthog.com"

if not api_key:
return

ph_client = Posthog(api_key, host=host)

return ph_client
18 changes: 9 additions & 9 deletions posthog/tasks/test/test_warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ def test_traverse_jobs_by_field(self, send_request_mock):
)

@patch("posthog.tasks.warehouse._traverse_jobs_by_field")
@patch("posthog.tasks.warehouse.get_ph_client")
@freeze_time("2023-11-07")
def test_calculate_workspace_rows_synced_by_team(self, traverse_jobs_mock):
def test_calculate_workspace_rows_synced_by_team(self, mock_capture, traverse_jobs_mock):
traverse_jobs_mock.return_value = [
{"count": 97747, "startTime": "2023-11-05T18:32:41Z"},
{"count": 93353, "startTime": "2023-11-07T16:50:49Z"},
]

mock_capture = MagicMock()
calculate_workspace_rows_synced_by_team(mock_capture, self.team.pk)
calculate_workspace_rows_synced_by_team(self.team.pk)

self.team.refresh_from_db()
self.assertEqual(self.team.external_data_workspace_rows_synced_in_month, 191100)
Expand All @@ -70,16 +70,16 @@ def test_calculate_workspace_rows_synced_by_team(self, traverse_jobs_mock):
)

@patch("posthog.tasks.warehouse._traverse_jobs_by_field")
@patch("posthog.tasks.warehouse.get_ph_client")
@freeze_time("2023-11-07")
def test_calculate_workspace_rows_synced_by_team_month_cutoff(self, traverse_jobs_mock):
def test_calculate_workspace_rows_synced_by_team_month_cutoff(self, mock_capture, traverse_jobs_mock):
# external_data_workspace_last_synced unset
traverse_jobs_mock.return_value = [
{"count": 97747, "startTime": "2023-10-30T18:32:41Z"},
{"count": 93353, "startTime": "2023-11-07T16:50:49Z"},
]

mock_capture = MagicMock()
calculate_workspace_rows_synced_by_team(mock_capture, self.team.pk)
calculate_workspace_rows_synced_by_team(self.team.pk)

self.team.refresh_from_db()
self.assertEqual(self.team.external_data_workspace_rows_synced_in_month, 93353)
Expand All @@ -89,8 +89,9 @@ def test_calculate_workspace_rows_synced_by_team_month_cutoff(self, traverse_job
)

@patch("posthog.tasks.warehouse._traverse_jobs_by_field")
@patch("posthog.tasks.warehouse.get_ph_client")
@freeze_time("2023-11-07")
def test_calculate_workspace_rows_synced_by_team_month_cutoff_field_set(self, traverse_jobs_mock):
def test_calculate_workspace_rows_synced_by_team_month_cutoff_field_set(self, mock_capture, traverse_jobs_mock):
self.team.external_data_workspace_last_synced = datetime.datetime(
2023, 10, 29, 18, 32, 41, tzinfo=datetime.timezone.utc
)
Expand All @@ -99,8 +100,7 @@ def test_calculate_workspace_rows_synced_by_team_month_cutoff_field_set(self, tr
{"count": 93353, "startTime": "2023-11-07T16:50:49Z"},
]

mock_capture = MagicMock()
calculate_workspace_rows_synced_by_team(mock_capture, self.team.pk)
calculate_workspace_rows_synced_by_team(self.team.pk)

self.team.refresh_from_db()
self.assertEqual(self.team.external_data_workspace_rows_synced_in_month, 93353)
Expand Down
10 changes: 8 additions & 2 deletions posthog/tasks/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
from posthog.models import Team
from posthog.warehouse.external_data_source.client import send_request
from urllib.parse import urlencode
from posthog.ph_client import get_ph_client

from posthog.celery import app

AIRBYTE_JOBS_URL = "https://api.airbyte.com/v1/jobs"

DEFAULT_DATE_TIME = datetime.datetime(2023, 11, 7, tzinfo=datetime.timezone.utc)


# TODO: split these into their own tasks
def calculate_workspace_rows_synced_by_team(ph_client, team_id):
@app.task(ignore_result=True, max_retries=2)
def calculate_workspace_rows_synced_by_team(team_id):
ph_client = get_ph_client()
team = Team.objects.get(pk=team_id)
now = datetime.datetime.now(datetime.timezone.utc)
begin = team.external_data_workspace_last_synced or now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
Expand Down Expand Up @@ -51,6 +55,8 @@ def calculate_workspace_rows_synced_by_team(ph_client, team_id):
team.external_data_workspace_rows_synced_in_month = total
team.save()

ph_client.shutdown()


def _traverse_jobs_by_field(ph_client, team, url, field, acc=[]):
response = send_request(url, method="GET")
Expand Down

0 comments on commit 18de41f

Please sign in to comment.