From 22d2aff23c192eed58420e0c85822c841ecb25ff Mon Sep 17 00:00:00 2001 From: Eric Duong Date: Mon, 7 Oct 2024 11:59:48 -0400 Subject: [PATCH] fix(data-warehouse): add naming convention (#25429) --- posthog/temporal/data_imports/pipelines/pipeline.py | 5 +---- posthog/temporal/data_imports/util.py | 7 +++++-- posthog/temporal/data_modeling/run_workflow.py | 1 + 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index a4cb30ddfdc1b..45aeb1860b29c 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -11,7 +11,6 @@ from posthog.settings.base_variables import TEST from structlog.typing import FilteringBoundLogger from dlt.common.libs.deltalake import get_delta_tables -from dlt.common.normalizers.naming.snake_case import NamingConvention from dlt.load.exceptions import LoadClientJobRetry from dlt.sources import DltSource from deltalake.exceptions import DeltaError @@ -104,9 +103,7 @@ async def _prepare_s3_files_for_querying(self, file_uris: list[str]): job: ExternalDataJob = await get_external_data_job(job_id=self.inputs.run_id) schema: ExternalDataSchema = await aget_schema_by_id(self.inputs.schema_id, self.inputs.team_id) - normalized_schema_name = NamingConvention().normalize_identifier(schema.name) - - prepare_s3_files_for_querying(job.folder_path(), normalized_schema_name, file_uris) + prepare_s3_files_for_querying(job.folder_path(), schema.name, file_uris) def _run(self) -> dict[str, int]: if self.refresh_dlt: diff --git a/posthog/temporal/data_imports/util.py b/posthog/temporal/data_imports/util.py index df20385477b18..cb520c2d44192 100644 --- a/posthog/temporal/data_imports/util.py +++ b/posthog/temporal/data_imports/util.py @@ -1,13 +1,16 @@ from posthog.warehouse.s3 import get_s3_client from django.conf import settings +from dlt.common.normalizers.naming.snake_case import NamingConvention def prepare_s3_files_for_querying(folder_path: str, table_name: str, file_uris: list[str]): s3 = get_s3_client() + normalized_table_name = NamingConvention().normalize_identifier(table_name) + s3_folder_for_job = f"{settings.BUCKET_URL}/{folder_path}" - s3_folder_for_schema = f"{s3_folder_for_job}/{table_name}" - s3_folder_for_querying = f"{s3_folder_for_job}/{table_name}__query" + s3_folder_for_schema = f"{s3_folder_for_job}/{normalized_table_name}" + s3_folder_for_querying = f"{s3_folder_for_job}/{normalized_table_name}__query" if s3.exists(s3_folder_for_querying): s3.delete(s3_folder_for_querying, recursive=True) diff --git a/posthog/temporal/data_modeling/run_workflow.py b/posthog/temporal/data_modeling/run_workflow.py index a77cefa29af48..8629bb29cdf76 100644 --- a/posthog/temporal/data_modeling/run_workflow.py +++ b/posthog/temporal/data_modeling/run_workflow.py @@ -330,6 +330,7 @@ async def materialize_model(model_label: str, team: Team) -> tuple[str, DeltaTab table.vacuum(retention_hours=24, enforce_retention_duration=False, dry_run=False) file_uris = table.file_uris() + prepare_s3_files_for_querying(saved_query.folder_path, saved_query.name, file_uris) key, delta_table = tables.popitem()