Skip to content

Commit

Permalink
feat(data-warehouse): external data job rewrite (#21494)
Browse files Browse the repository at this point in the history
* WIP

* Reworked the worker to self-manage making schema schedules and use async temporal calls

* Added schema status and use it for the job status

* Fixed existing tests

* Added new tests to cover check_schedule_activity

* Updated the source API to trigger active schemas

* Added master changes for stripe source

* Updated mypy

* add blank to field

* update migrations

* update mypy

* fix tpyes

* Update query snapshots

* Update query snapshots

* fix types

* update mypy

* type ignore

* add comment

* add default args, fix missing schema sync creation, add deletion logic

* remove defaults

* add blank

* cleanup

* add failsafe

* update reload logic

* create new schemas if triggered between reloads

* add schema off check

---------

Co-authored-by: eric <[email protected]>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 17, 2024
1 parent fff6720 commit b2773cb
Show file tree
Hide file tree
Showing 23 changed files with 786 additions and 508 deletions.
16 changes: 7 additions & 9 deletions frontend/src/scenes/data-warehouse/new/NewSourceWizard.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { LemonButton } from '@posthog/lemon-ui'
import { useActions, useValues } from 'kea'
import { router } from 'kea-router'
import { PageHeader } from 'lib/components/PageHeader'
import { FEATURE_FLAGS } from 'lib/constants'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
Expand All @@ -10,7 +9,6 @@ import stripeLogo from 'public/stripe-logo.svg'
import zendeskLogo from 'public/zendesk-logo.png'
import { useCallback } from 'react'
import { SceneExport } from 'scenes/sceneTypes'
import { urls } from 'scenes/urls'

import { SourceConfig } from '~/types'

Expand All @@ -26,7 +24,7 @@ export const scene: SceneExport = {
}
export function NewSourceWizard(): JSX.Element {
const { modalTitle, modalCaption } = useValues(sourceWizardLogic)
const { onBack, onSubmit, closeWizard, cancelWizard } = useActions(sourceWizardLogic)
const { onBack, onSubmit, closeWizard } = useActions(sourceWizardLogic)
const { currentStep, isLoading, canGoBack, canGoNext, nextButtonText, showSkipButton } =
useValues(sourceWizardLogic)

Expand Down Expand Up @@ -65,17 +63,17 @@ export function NewSourceWizard(): JSX.Element {
)
}, [currentStep, isLoading, canGoNext, canGoBack, nextButtonText, showSkipButton])

const onCancel = (): void => {
cancelWizard()
router.actions.push(urls.dataWarehouse())
}

return (
<>
<PageHeader
buttons={
<>
<LemonButton type="secondary" center data-attr="source-form-cancel-button" onClick={onCancel}>
<LemonButton
type="secondary"
center
data-attr="source-form-cancel-button"
onClick={closeWizard}
>
Cancel
</LemonButton>
</>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ export const sourceWizardLogic = kea<sourceWizardLogicType>([
}
},
closeWizard: () => {
actions.onClear()
actions.clearSource()
actions.loadSources(null)
router.actions.push(urls.dataWarehouseSettings())
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0401_experiment_exposure_cohort
posthog: 0402_externaldatajob_schema
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
21 changes: 10 additions & 11 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ posthog/temporal/common/utils.py:0: error: Argument 1 to "abstractclassmethod" h
posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" has named arguments: "cls". Consider marking them positional-only
posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/talk_api.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "str") [assignment]
posthog/hogql/modifiers.py:0: error: Incompatible types in assignment (expression has type "PersonOnEventsMode", variable has type "PersonsOnEventsMode | None") [assignment]
posthog/hogql/database/argmax.py:0: error: Argument "chain" to "Field" has incompatible type "list[str]"; expected "list[str | int]" [arg-type]
posthog/hogql/database/argmax.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance
posthog/hogql/database/argmax.py:0: note: Consider using "Sequence" instead, which is covariant
Expand Down Expand Up @@ -134,6 +133,7 @@ posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict ent
posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "LifecycleFilter"; expected "str": "TrendsFilter" [dict-item]
posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "StickinessFilter"; expected "str": "TrendsFilter" [dict-item]
posthog/hogql_queries/legacy_compatibility/feature_flag.py:0: error: Item "AnonymousUser" of "User | AnonymousUser" has no attribute "email" [union-attr]
posthog/hogql/modifiers.py:0: error: Incompatible types in assignment (expression has type "PersonOnEventsMode", variable has type "PersonsOnEventsMode | None") [assignment]
posthog/api/utils.py:0: error: Incompatible types in assignment (expression has type "type[EventDefinition]", variable has type "type[EnterpriseEventDefinition]") [assignment]
posthog/api/utils.py:0: error: Argument 1 to "UUID" has incompatible type "int | str"; expected "str | None" [arg-type]
ee/billing/quota_limiting.py:0: error: List comprehension has incompatible type List[int]; expected List[str] [misc]
Expand Down Expand Up @@ -167,13 +167,6 @@ posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "
posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "None" of "list[Expr] | Any | None" has no attribute "append" [union-attr]
ee/billing/billing_manager.py:0: error: TypedDict "CustomerInfo" has no key "available_product_features" [typeddict-item]
ee/billing/billing_manager.py:0: note: Did you mean "available_features"?
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/hogql/resolver.py:0: error: Argument 1 of "visit" is incompatible with supertype "Visitor"; supertype defines the argument type as "AST" [override]
posthog/hogql/resolver.py:0: note: This violates the Liskov substitution principle
posthog/hogql/resolver.py:0: note: See https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
Expand Down Expand Up @@ -233,9 +226,6 @@ posthog/hogql/resolver.py:0: error: Argument 1 to "get_child" of "Type" has inco
posthog/hogql/resolver.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "Alias") [assignment]
posthog/hogql/resolver.py:0: error: Argument "alias" to "Alias" has incompatible type "str | int"; expected "str" [arg-type]
posthog/hogql/resolver.py:0: error: Argument 1 to "join" of "str" has incompatible type "list[str | int]"; expected "Iterable[str]" [arg-type]
posthog/temporal/data_imports/external_data_job.py:0: error: Argument "team_id" has incompatible type "int"; expected "str" [arg-type]
posthog/temporal/data_imports/external_data_job.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/external_data_job.py:0: error: Argument "team_id" has incompatible type "int"; expected "str" [arg-type]
posthog/hogql/transforms/lazy_tables.py:0: error: Incompatible default for argument "context" (default has type "None", argument has type "HogQLContext") [assignment]
posthog/hogql/transforms/lazy_tables.py:0: note: PEP 484 prohibits implicit Optional. Accordingly, mypy has changed its default to no_implicit_optional=True
posthog/hogql/transforms/lazy_tables.py:0: note: Use https://github.com/hauntsaninja/no_implicit_optional to automatically upgrade your codebase
Expand Down Expand Up @@ -577,6 +567,15 @@ posthog/hogql/database/schema/event_sessions.py:0: error: Statement is unreachab
posthog/api/organization_member.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc]
ee/api/role.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc]
ee/clickhouse/views/insights.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc]
posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: error: Argument 6 has incompatible type "ExternalDataSchema"; expected "str" [arg-type]
posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get" [attr-defined]
posthog/queries/trends/test/test_person.py:0: error: Invalid index type "int" for "HttpResponse"; expected type "str | bytes" [index]
posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get" [attr-defined]
Expand Down
25 changes: 25 additions & 0 deletions posthog/migrations/0402_externaldatajob_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 4.1.13 on 2024-04-15 14:32

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):
dependencies = [
("posthog", "0401_experiment_exposure_cohort"),
]

operations = [
migrations.AddField(
model_name="externaldatajob",
name="schema",
field=models.ForeignKey(
blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to="posthog.externaldataschema"
),
),
migrations.AddField(
model_name="externaldataschema",
name="status",
field=models.CharField(max_length=400, null=True, blank=True),
),
]
13 changes: 13 additions & 0 deletions posthog/temporal/common/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,16 @@ async def sync_connect() -> Client:
settings.TEMPORAL_CLIENT_KEY,
)
return client


async def async_connect() -> Client:
"""Asynchronous connect to Temporal and return a Client."""
client = await connect(
settings.TEMPORAL_HOST,
settings.TEMPORAL_PORT,
settings.TEMPORAL_NAMESPACE,
settings.TEMPORAL_CLIENT_ROOT_CA,
settings.TEMPORAL_CLIENT_CERT,
settings.TEMPORAL_CLIENT_KEY,
)
return client
44 changes: 43 additions & 1 deletion posthog/temporal/common/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ async def create_schedule(temporal: Client, id: str, schedule: Schedule, trigger
)


async def a_create_schedule(temporal: Client, id: str, schedule: Schedule, trigger_immediately: bool = False):
"""Async create a Temporal Schedule."""
return await temporal.create_schedule(
id=id,
schedule=schedule,
trigger_immediately=trigger_immediately,
)


@async_to_sync
async def update_schedule(temporal: Client, id: str, schedule: Schedule) -> None:
"""Update a Temporal Schedule."""
Expand All @@ -25,6 +34,18 @@ async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate:
)


async def a_update_schedule(temporal: Client, id: str, schedule: Schedule) -> None:
"""Async update a Temporal Schedule."""
handle = temporal.get_schedule_handle(id)

async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate:
return ScheduleUpdate(schedule=schedule)

return await handle.update(
updater=updater,
)


@async_to_sync
async def unpause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None:
"""Unpause a Temporal Schedule."""
Expand All @@ -39,6 +60,12 @@ async def delete_schedule(temporal: Client, schedule_id: str) -> None:
await handle.delete()


async def a_delete_schedule(temporal: Client, schedule_id: str) -> None:
"""Async delete a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
await handle.delete()


@async_to_sync
async def describe_schedule(temporal: Client, schedule_id: str):
"""Describe a Temporal Schedule."""
Expand All @@ -55,6 +82,21 @@ async def pause_schedule(temporal: Client, schedule_id: str, note: str | None =

@async_to_sync
async def trigger_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None:
"""Pause a Temporal Schedule."""
"""Trigger a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
await handle.trigger()


async def a_trigger_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None:
"""Trigger a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
await handle.trigger()


async def a_schedule_exists(temporal: Client, schedule_id: str) -> bool:
"""Check whether a schedule exists."""
try:
await temporal.get_schedule_handle(schedule_id).describe()
return True
except:
return False
10 changes: 6 additions & 4 deletions posthog/temporal/data_imports/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from posthog.temporal.data_imports.external_data_job import (
ExternalDataJobWorkflow,
create_external_data_job_model,
create_external_data_job_model_activity,
create_source_templates,
run_external_data_job,
import_data_activity,
update_external_data_job_model,
validate_schema_activity,
check_schedule_activity,
)

WORKFLOWS = [ExternalDataJobWorkflow]

ACTIVITIES = [
create_external_data_job_model,
create_external_data_job_model_activity,
update_external_data_job_model,
run_external_data_job,
import_data_activity,
validate_schema_activity,
create_source_templates,
check_schedule_activity,
]
Loading

0 comments on commit b2773cb

Please sign in to comment.