Skip to content

Commit

Permalink
Fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 committed Jun 17, 2024
1 parent a3a9cad commit 8baacbf
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
20 changes: 12 additions & 8 deletions posthog/temporal/tests/external_data/test_external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,18 @@ async def setup_job_1():
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

customer_schema = await _create_schema("Customer", new_source, team)

new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)(
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
schema=customer_schema,
)

new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()

customer_schema = await _create_schema("Customer", new_source, team)

inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=str(new_job.pk),
Expand Down Expand Up @@ -425,19 +426,20 @@ async def setup_job_1():
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

customer_schema = await _create_schema("Customer", new_source, team)

# Already canceled so it should only run once
# This imitates if the job was canceled mid run
new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)(
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.CANCELLED,
rows_synced=0,
schema=customer_schema,
)

new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()

customer_schema = await _create_schema("Customer", new_source, team)

inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=str(new_job.pk),
Expand Down Expand Up @@ -510,17 +512,18 @@ async def setup_job_1():
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

customer_schema = await _create_schema("Customer", new_source, team)

new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)(
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
schema=customer_schema,
)

new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()

customer_schema = await _create_schema("Customer", new_source, team)

inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=str(new_job.pk),
Expand Down Expand Up @@ -675,17 +678,18 @@ async def setup_job_1():
},
)

posthog_test_schema = await _create_schema("posthog_test", new_source, team)

new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)(
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
schema=posthog_test_schema,
)

new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()

posthog_test_schema = await _create_schema("posthog_test", new_source, team)

inputs = ImportDataActivityInputs(
team_id=team.id, run_id=str(new_job.pk), source_id=new_source.pk, schema_id=posthog_test_schema.id
)
Expand Down
2 changes: 1 addition & 1 deletion posthog/warehouse/models/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Status(models.TextChoices):

@property
def folder_path(self) -> str:
return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema.pk)}".lower().replace("-", "_")
return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema_id)}".lower().replace("-", "_")

def url_pattern_by_schema(self, schema: str) -> str:
return f"https://{settings.AIRBYTE_BUCKET_DOMAIN}/dlt/{self.folder_path}/{schema.lower()}/*.parquet"
Expand Down

0 comments on commit 8baacbf

Please sign in to comment.