Skip to content

Commit

Permalink
fix(data-warehouse): add naming convention (#25429)
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE authored Oct 7, 2024
1 parent fb92840 commit 22d2aff
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
5 changes: 1 addition & 4 deletions posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from posthog.settings.base_variables import TEST
from structlog.typing import FilteringBoundLogger
from dlt.common.libs.deltalake import get_delta_tables
from dlt.common.normalizers.naming.snake_case import NamingConvention
from dlt.load.exceptions import LoadClientJobRetry
from dlt.sources import DltSource
from deltalake.exceptions import DeltaError
Expand Down Expand Up @@ -104,9 +103,7 @@ async def _prepare_s3_files_for_querying(self, file_uris: list[str]):
job: ExternalDataJob = await get_external_data_job(job_id=self.inputs.run_id)
schema: ExternalDataSchema = await aget_schema_by_id(self.inputs.schema_id, self.inputs.team_id)

normalized_schema_name = NamingConvention().normalize_identifier(schema.name)

prepare_s3_files_for_querying(job.folder_path(), normalized_schema_name, file_uris)
prepare_s3_files_for_querying(job.folder_path(), schema.name, file_uris)

def _run(self) -> dict[str, int]:
if self.refresh_dlt:
Expand Down
7 changes: 5 additions & 2 deletions posthog/temporal/data_imports/util.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from posthog.warehouse.s3 import get_s3_client
from django.conf import settings
from dlt.common.normalizers.naming.snake_case import NamingConvention


def prepare_s3_files_for_querying(folder_path: str, table_name: str, file_uris: list[str]):
s3 = get_s3_client()

normalized_table_name = NamingConvention().normalize_identifier(table_name)

s3_folder_for_job = f"{settings.BUCKET_URL}/{folder_path}"
s3_folder_for_schema = f"{s3_folder_for_job}/{table_name}"
s3_folder_for_querying = f"{s3_folder_for_job}/{table_name}__query"
s3_folder_for_schema = f"{s3_folder_for_job}/{normalized_table_name}"
s3_folder_for_querying = f"{s3_folder_for_job}/{normalized_table_name}__query"

if s3.exists(s3_folder_for_querying):
s3.delete(s3_folder_for_querying, recursive=True)
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/data_modeling/run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ async def materialize_model(model_label: str, team: Team) -> tuple[str, DeltaTab
table.vacuum(retention_hours=24, enforce_retention_duration=False, dry_run=False)

file_uris = table.file_uris()

prepare_s3_files_for_querying(saved_query.folder_path, saved_query.name, file_uris)

key, delta_table = tables.popitem()
Expand Down

0 comments on commit 22d2aff

Please sign in to comment.