diff --git a/posthog/settings/data_warehouse.py b/posthog/settings/data_warehouse.py index a0a78a9621e3a..884264c674915 100644 --- a/posthog/settings/data_warehouse.py +++ b/posthog/settings/data_warehouse.py @@ -1,4 +1,5 @@ import os +from posthog.settings.utils import get_list AIRBYTE_API_KEY = os.getenv("AIRBYTE_API_KEY", None) AIRBYTE_BUCKET_REGION = os.getenv("AIRBYTE_BUCKET_REGION", None) @@ -9,3 +10,5 @@ BUCKET_URL = os.getenv("BUCKET_URL", None) AIRBYTE_BUCKET_NAME = os.getenv("AIRBYTE_BUCKET_NAME", None) BUCKET = "test-pipeline" + +V2_PIPELINE_ENABLED_TEAM_IDS = get_list(os.getenv("V2_PIPELINE_ENABLED_TEAM_IDS", "")) diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 59508a2ee6f25..9e0f9ca225a84 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -21,7 +21,7 @@ from posthog.settings.base_variables import TEST from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.common.client import sync_connect -from posthog.temporal.data_imports.util import is_posthog_team +from posthog.temporal.data_imports.util import is_posthog_team, is_enabled_for_team from posthog.temporal.data_imports.workflow_activities.check_billing_limits import ( CheckBillingLimitsActivityInputs, check_billing_limits_activity, @@ -211,7 +211,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs): if ( settings.TEMPORAL_TASK_QUEUE != DATA_WAREHOUSE_TASK_QUEUE_V2 and not TEST - and is_posthog_team(inputs.team_id) + and (is_posthog_team(inputs.team_id) or is_enabled_for_team(inputs.team_id)) ): await workflow.execute_activity( trigger_pipeline_v2, diff --git a/posthog/temporal/data_imports/util.py b/posthog/temporal/data_imports/util.py index 4a133ef336b42..1900c237c1c2a 100644 --- a/posthog/temporal/data_imports/util.py +++ b/posthog/temporal/data_imports/util.py @@ -44,3 +44,7 @@ def is_posthog_team(team_id: int) -> bool: region = get_from_env("CLOUD_DEPLOYMENT", optional=True) return (region == "EU" and team_id == 1) or (region == "US" and team_id == 2) + + +def is_enabled_for_team(team_id: int) -> bool: + return str(team_id) in settings.V2_PIPELINE_ENABLED_TEAM_IDS