Skip to content

Commit

Permalink
chore(data-warehouse): remove nested call from data job (#20017)
Browse files Browse the repository at this point in the history
* remove nested call from data job

* try different strategy

* use db cleanup decorator from django channels

* add comment

* fix

* working tests

* add ignore

* remove unnecessary after refactor

* one more type ignore

* more types..

* remove experiment

* restore test

* remove redundant

* unused type ignore
  • Loading branch information
EDsCODE authored Jan 31, 2024
1 parent ce02cfb commit 667ace1
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 145 deletions.
9 changes: 4 additions & 5 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs
from posthog.warehouse.external_data_source.jobs import (
create_external_data_job,
get_external_data_job,
update_external_job_status,
)
from posthog.warehouse.models import (
ExternalDataJob,
get_active_schemas_for_source_id,
sync_old_schemas_with_new_schemas,
ExternalDataSource,
get_external_data_job,
)
from posthog.warehouse.models.external_data_schema import get_postgres_schemas
from posthog.temporal.common.logger import bind_temporal_worker_logger
Expand Down Expand Up @@ -116,7 +116,7 @@ class ValidateSchemaInputs:

@activity.defn
async def validate_schema_activity(inputs: ValidateSchemaInputs) -> None:
await sync_to_async(validate_schema_and_update_table)( # type: ignore
await validate_schema_and_update_table(
run_id=inputs.run_id,
team_id=inputs.team_id,
schemas=inputs.schemas,
Expand Down Expand Up @@ -144,9 +144,8 @@ class ExternalDataJobInputs:

@activity.defn
async def run_external_data_job(inputs: ExternalDataJobInputs) -> None:
model: ExternalDataJob = await sync_to_async(get_external_data_job)( # type: ignore
team_id=inputs.team_id,
run_id=inputs.run_id,
model: ExternalDataJob = await get_external_data_job(
job_id=inputs.run_id,
)

logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
Expand Down
121 changes: 119 additions & 2 deletions posthog/temporal/tests/external_data/test_external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ExternalDataJob,
ExternalDataSource,
ExternalDataSchema,
DataWarehouseCredential,
)

from posthog.temporal.data_imports.pipelines.schemas import (
Expand Down Expand Up @@ -361,6 +362,122 @@ async def test_validate_schema_and_update_table_activity(activity_environment, t
)

assert mock_get_columns.call_count == 10
assert (
await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 5 # type: ignore
)


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_validate_schema_and_update_table_activity_with_existing(activity_environment, team, **kwargs):
new_source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key"},
) # type: ignore

old_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)( # type: ignore
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.COMPLETED,
rows_synced=0,
)

old_credential = await sync_to_async(DataWarehouseCredential.objects.create)( # type: ignore
team=team,
access_key=settings.OBJECT_STORAGE_ACCESS_KEY_ID,
access_secret=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
)

url_pattern = await sync_to_async(old_job.url_pattern_by_schema)("test-1") # type: ignore

await sync_to_async(DataWarehouseTable.objects.create)( # type: ignore
credential=old_credential,
name="stripe_test-1",
format="Parquet",
url_pattern=url_pattern,
team_id=team.pk,
external_data_source_id=new_source.pk,
)

new_job = await sync_to_async(ExternalDataJob.objects.create)( # type: ignore
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
)

with mock.patch(
"posthog.warehouse.models.table.DataWarehouseTable.get_columns"
) as mock_get_columns, override_settings(**AWS_BUCKET_MOCK_SETTINGS):
mock_get_columns.return_value = {"id": "string"}
await activity_environment.run(
validate_schema_activity,
ValidateSchemaInputs(
run_id=new_job.pk, team_id=team.id, schemas=["test-1", "test-2", "test-3", "test-4", "test-5"]
),
)

assert mock_get_columns.call_count == 10
assert (
await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 5 # type: ignore
)


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_validate_schema_and_update_table_activity_half_run(activity_environment, team, **kwargs):
new_source = await sync_to_async(ExternalDataSource.objects.create)( # type: ignore
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key"},
)

new_job = await sync_to_async(ExternalDataJob.objects.create)( # type: ignore
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
)

with mock.patch("posthog.warehouse.models.table.DataWarehouseTable.get_columns") as mock_get_columns, mock.patch(
"posthog.warehouse.data_load.validate_schema.validate_schema",
) as mock_validate, override_settings(**AWS_BUCKET_MOCK_SETTINGS):
mock_get_columns.return_value = {"id": "string"}
credential = await sync_to_async(DataWarehouseCredential.objects.create)( # type: ignore
team=team,
access_key=settings.OBJECT_STORAGE_ACCESS_KEY_ID,
access_secret=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
)

mock_validate.side_effect = [
Exception,
{
"credential": credential,
"format": "Parquet",
"name": "test_schema",
"url_pattern": "test_url_pattern",
"team_id": team.pk,
},
]

await activity_environment.run(
validate_schema_activity,
ValidateSchemaInputs(run_id=new_job.pk, team_id=team.id, schemas=["broken_schema", "test_schema"]),
)

assert mock_get_columns.call_count == 1
assert (
await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1 # type: ignore
)


@pytest.mark.django_db(transaction=True)
Expand Down Expand Up @@ -446,7 +563,7 @@ async def test_external_data_job_workflow_blank(team, **kwargs):
retry_policy=RetryPolicy(maximum_attempts=1),
)

run = await sync_to_async(get_latest_run_if_exists)(team_id=team.pk, pipeline_id=new_source.pk) # type: ignore
run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=new_source.pk)
assert run is not None
assert run.status == ExternalDataJob.Status.COMPLETED

Expand Down Expand Up @@ -509,7 +626,7 @@ async def mock_async_func(inputs):
retry_policy=RetryPolicy(maximum_attempts=1),
)

run = await sync_to_async(get_latest_run_if_exists)(team_id=team.pk, pipeline_id=new_source.pk) # type: ignore
run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=new_source.pk)

assert run is not None
assert run.status == ExternalDataJob.Status.COMPLETED
Expand Down
Empty file.
116 changes: 0 additions & 116 deletions posthog/warehouse/data_load/test/test_validate_schema.py

This file was deleted.

Loading

0 comments on commit 667ace1

Please sign in to comment.