Skip to content

Commit

Permalink
feat(data-warehouse): Change external data job to be all synchronous (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Nov 7, 2024
1 parent fb28731 commit d95550f
Show file tree
Hide file tree
Showing 17 changed files with 490 additions and 1,184 deletions.
17 changes: 8 additions & 9 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -617,13 +617,12 @@ posthog/warehouse/api/external_data_schema.py:0: note: def [_T] get(self, Type,
posthog/warehouse/api/table.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/warehouse/api/table.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/warehouse/api/table.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: error: Argument 1 has incompatible type "str"; expected "Type" [arg-type]
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: error: No overload variant of "get" of "dict" matches argument types "str", "tuple[()]" [call-overload]
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: Possible overload variants:
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: def get(self, Type, /) -> Sequence[str] | None
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: def get(self, Type, Sequence[str], /) -> Sequence[str]
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: def [_T] get(self, Type, _T, /) -> Sequence[str] | _T
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: error: Argument "source_id" has incompatible type "str"; expected "UUID" [arg-type]
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: error: Argument "source_id" to "sync_old_schemas_with_new_schemas" has incompatible type "str"; expected "UUID" [arg-type]
posthog/tasks/exports/test/test_csv_exporter.py:0: error: Function is missing a return type annotation [no-untyped-def]
posthog/tasks/exports/test/test_csv_exporter.py:0: error: Function is missing a type annotation [no-untyped-def]
posthog/tasks/exports/test/test_csv_exporter.py:0: error: Function is missing a type annotation for one or more arguments [no-untyped-def]
Expand Down Expand Up @@ -796,6 +795,11 @@ posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict k
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 20 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 21 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 22 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "FilesystemDestinationClientConfiguration" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "type[FilesystemDestinationClientConfiguration]" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "DataWarehouseCredential | Combinable | None") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "str | int | Combinable") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "dict[str, dict[str, str | bool]] | dict[str, str]", variable has type "dict[str, dict[str, str]]") [assignment]
posthog/session_recordings/session_recording_api.py:0: error: Argument "team_id" to "get_realtime_snapshots" has incompatible type "int"; expected "str" [arg-type]
posthog/session_recordings/session_recording_api.py:0: error: Value of type variable "SupportsRichComparisonT" of "sorted" cannot be "str | None" [type-var]
posthog/session_recordings/session_recording_api.py:0: error: Argument 1 to "get" of "dict" has incompatible type "str | None"; expected "str" [arg-type]
Expand Down Expand Up @@ -826,19 +830,14 @@ posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0:
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: List item 0 has incompatible type "tuple[str, str, int, int, int, int, str, int]"; expected "tuple[str, str, int, int, str, str, str, str]" [list-item]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp" [attr-defined]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined]
posthog/temporal/data_imports/workflow_activities/import_data.py:0: error: Argument "job_type" to "PipelineInputs" has incompatible type "str"; expected "Type" [arg-type]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "FilesystemDestinationClientConfiguration" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "type[FilesystemDestinationClientConfiguration]" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "DataWarehouseCredential | Combinable | None") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "str | int | Combinable") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "dict[str, dict[str, str | bool]] | dict[str, str]", variable has type "dict[str, dict[str, str]]") [assignment]
posthog/migrations/0237_remove_timezone_from_teams.py:0: error: Argument 2 to "RunPython" has incompatible type "Callable[[Migration, Any], None]"; expected "_CodeCallable | None" [arg-type]
posthog/migrations/0228_fix_tile_layouts.py:0: error: Argument 2 to "RunPython" has incompatible type "Callable[[Migration, Any], None]"; expected "_CodeCallable | None" [arg-type]
posthog/api/plugin_log_entry.py:0: error: Name "timezone.datetime" is not defined [name-defined]
posthog/api/plugin_log_entry.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined]
posthog/api/plugin_log_entry.py:0: error: Name "timezone.datetime" is not defined [name-defined]
posthog/api/plugin_log_entry.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined]
posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment]
posthog/temporal/data_imports/external_data_job.py:0: error: Argument "status" to "update_external_job_status" has incompatible type "str"; expected "Status" [arg-type]
posthog/api/sharing.py:0: error: Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable) [union-attr]
posthog/api/test/batch_exports/conftest.py:0: error: Signature of "run" incompatible with supertype "Worker" [override]
posthog/api/test/batch_exports/conftest.py:0: note: Superclass:
Expand All @@ -850,10 +849,10 @@ posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/data_imports/test_end_to_end.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/api/test/test_team.py:0: error: "HttpResponse" has no attribute "json" [attr-defined]
posthog/api/test/test_team.py:0: error: "HttpResponse" has no attribute "json" [attr-defined]
posthog/test/test_middleware.py:0: error: Incompatible types in assignment (expression has type "_MonkeyPatchedWSGIResponse", variable has type "_MonkeyPatchedResponse") [assignment]
posthog/temporal/tests/data_imports/test_end_to_end.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/management/commands/test/test_create_batch_export_from_app.py:0: error: Incompatible return value type (got "dict[str, Collection[str]]", expected "dict[str, str]") [return-value]
posthog/management/commands/test/test_create_batch_export_from_app.py:0: error: Incompatible types in assignment (expression has type "dict[str, Collection[str]]", variable has type "dict[str, str]") [assignment]
posthog/management/commands/test/test_create_batch_export_from_app.py:0: error: Unpacked dict entry 1 has incompatible type "str"; expected "SupportsKeysAndGetItem[str, str]" [dict-item]
Expand Down
2 changes: 2 additions & 0 deletions posthog/temporal/common/heartbeat_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ def __init__(self, details: tuple[Any, ...] = (), factor: int = 12, logger: Opti
self.details: tuple[Any, ...] = details
self.factor = factor
self.logger = logger
self.stop_event: Optional[threading.Event] = None
self.heartbeat_thread: Optional[threading.Thread] = None

def log_debug(self, message: str, exc_info: Optional[Any] = None) -> None:
if self.logger:
Expand Down
4 changes: 0 additions & 4 deletions posthog/temporal/data_imports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
ExternalDataJobWorkflow,
create_external_data_job_model_activity,
create_source_templates,
import_data_activity,
import_data_activity_sync,
update_external_data_job_model,
check_schedule_activity,
check_billing_limits_activity,
sync_new_schemas_activity,
)
Expand All @@ -15,10 +13,8 @@
ACTIVITIES = [
create_external_data_job_model_activity,
update_external_data_job_model,
import_data_activity,
import_data_activity_sync,
create_source_templates,
check_schedule_activity,
check_billing_limits_activity,
sync_new_schemas_activity,
]
103 changes: 22 additions & 81 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

# TODO: remove dependency
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.data_imports.util import is_posthog_team
from posthog.temporal.data_imports.workflow_activities.check_billing_limits import (
CheckBillingLimitsActivityInputs,
check_billing_limits_activity,
Expand All @@ -23,28 +22,19 @@
CreateExternalDataJobModelActivityInputs,
create_external_data_job_model_activity,
)
from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity
from posthog.temporal.data_imports.workflow_activities.import_data_sync import ImportDataActivityInputs
from posthog.utils import get_machine_id
from posthog.warehouse.data_load.service import (
a_delete_external_data_schedule,
a_external_data_workflow_exists,
a_sync_external_data_job_workflow,
a_trigger_external_data_workflow,
)
from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source

from posthog.warehouse.external_data_source.jobs import (
aget_running_job_for_schema,
aupdate_external_job_status,
update_external_job_status,
)
from posthog.warehouse.models import (
ExternalDataJob,
get_active_schemas_for_source_id,
ExternalDataSource,
get_external_data_source,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.warehouse.models.external_data_schema import aupdate_should_sync
from posthog.temporal.common.logger import bind_temporal_worker_logger_sync
from posthog.warehouse.models.external_data_schema import update_should_sync


Non_Retryable_Schema_Errors: dict[ExternalDataSource.Type, list[str]] = {
Expand Down Expand Up @@ -76,11 +66,15 @@ class UpdateExternalDataJobStatusInputs:


@activity.defn
async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInputs) -> None:
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInputs) -> None:
logger = bind_temporal_worker_logger_sync(team_id=inputs.team_id)

if inputs.job_id is None:
job: ExternalDataJob | None = await aget_running_job_for_schema(inputs.schema_id)
job: ExternalDataJob | None = (
ExternalDataJob.objects.filter(schema_id=inputs.schema_id, status=ExternalDataJob.Status.RUNNING)
.order_by("-created_at")
.first()
)
if job is None:
logger.info("No job to update status on")
return
Expand All @@ -94,7 +88,7 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu
f"External data job failed for external data schema {inputs.schema_id} with error: {inputs.internal_error}"
)

source: ExternalDataSource = await get_external_data_source(inputs.source_id)
source: ExternalDataSource = ExternalDataSource.objects.get(pk=inputs.source_id)
non_retryable_errors = Non_Retryable_Schema_Errors.get(ExternalDataSource.Type(source.source_type))

if non_retryable_errors is not None:
Expand All @@ -113,9 +107,9 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu
"error": inputs.internal_error,
},
)
await aupdate_should_sync(schema_id=inputs.schema_id, team_id=inputs.team_id, should_sync=False)
update_should_sync(schema_id=inputs.schema_id, team_id=inputs.team_id, should_sync=False)

await aupdate_external_job_status(
update_external_job_status(
job_id=job_id,
status=inputs.status,
latest_error=inputs.latest_error,
Expand All @@ -134,34 +128,8 @@ class CreateSourceTemplateInputs:


@activity.defn
async def create_source_templates(inputs: CreateSourceTemplateInputs) -> None:
await create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id)


@activity.defn
async def check_schedule_activity(inputs: ExternalDataWorkflowInputs) -> bool:
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)

# Creates schedules for all schemas if they don't exist yet, and then remove itself as a source schedule
if inputs.external_data_schema_id is None:
logger.info("Schema ID is none, creating schedules for schemas...")
schemas = await get_active_schemas_for_source_id(
team_id=inputs.team_id, source_id=inputs.external_data_source_id
)
for schema in schemas:
if await a_external_data_workflow_exists(schema.id):
await a_trigger_external_data_workflow(schema)
logger.info(f"Schedule exists for schema {schema.id}. Triggered schedule")
else:
await a_sync_external_data_job_workflow(schema, create=True)
logger.info(f"Created schedule for schema {schema.id}")
# Delete the source schedule in favour of the schema schedules
await a_delete_external_data_schedule(ExternalDataSource(id=inputs.external_data_source_id))
logger.info(f"Deleted schedule for source {inputs.external_data_source_id}")
return True

logger.info("Schema ID is set. Continuing...")
return False
def create_source_templates(inputs: CreateSourceTemplateInputs) -> None:
create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id)


# TODO: update retry policies
Expand All @@ -174,21 +142,6 @@ def parse_inputs(inputs: list[str]) -> ExternalDataWorkflowInputs:

@workflow.run
async def run(self, inputs: ExternalDataWorkflowInputs):
should_exit = await workflow.execute_activity(
check_schedule_activity,
inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)

if should_exit:
return

assert inputs.external_data_schema_id is not None

update_inputs = UpdateExternalDataJobStatusInputs(
Expand Down Expand Up @@ -262,24 +215,12 @@ async def run(self, inputs: ExternalDataWorkflowInputs):
else {"start_to_close_timeout": dt.timedelta(hours=12), "retry_policy": RetryPolicy(maximum_attempts=3)}
)

if is_posthog_team(inputs.team_id) and (
source_type == ExternalDataSource.Type.POSTGRES or source_type == ExternalDataSource.Type.BIGQUERY
):
# Sync activity for testing
await workflow.execute_activity(
import_data_activity_sync,
job_inputs,
heartbeat_timeout=dt.timedelta(minutes=5),
**timeout_params,
) # type: ignore
else:
# Async activity for everyone else
await workflow.execute_activity(
import_data_activity,
job_inputs,
heartbeat_timeout=dt.timedelta(minutes=5),
**timeout_params,
) # type: ignore
await workflow.execute_activity(
import_data_activity_sync,
job_inputs,
heartbeat_timeout=dt.timedelta(minutes=5),
**timeout_params,
) # type: ignore

# Create source templates
await workflow.execute_activity(
Expand Down
Loading

0 comments on commit d95550f

Please sign in to comment.