Skip to content

Commit

Permalink
Merge branch 'master' into mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Jul 23, 2024
2 parents 3acd08f + 0f55636 commit cfddab8
Show file tree
Hide file tree
Showing 19 changed files with 218 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
"WORKER_CONCURRENCY": "2",
"OBJECT_STORAGE_ENABLED": "True",
"HOG_HOOK_URL": "http://localhost:3300/hoghook",
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "*"
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": ""
},
"presentation": {
"group": "main"
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 5 additions & 2 deletions frontend/src/scenes/pipeline/destinations/NewDestinations.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IconPlusSmall } from '@posthog/icons'
import { LemonButton, LemonInput, LemonSelect, LemonTable, LemonTag } from '@posthog/lemon-ui'
import { LemonButton, LemonInput, LemonSelect, LemonTable, LemonTag, Link } from '@posthog/lemon-ui'
import { useActions, useValues } from 'kea'
import { capitalizeFirstLetter } from 'kea-forms'
import { PayGateButton } from 'lib/components/PayGateMini/PayGateButton'
Expand All @@ -16,7 +16,7 @@ import { newDestinationsLogic } from './newDestinationsLogic'
export function DestinationOptionsTable(): JSX.Element {
const hogFunctionsEnabled = !!useFeatureFlag('HOG_FUNCTIONS')
const { loading, filteredDestinations, filters } = useValues(newDestinationsLogic)
const { setFilters } = useActions(newDestinationsLogic)
const { setFilters, openFeedbackDialog } = useActions(newDestinationsLogic)
const { canEnableNewDestinations } = useValues(pipelineAccessLogic)

return (
Expand All @@ -30,6 +30,9 @@ export function DestinationOptionsTable(): JSX.Element {
value={filters.search ?? ''}
onChange={(e) => setFilters({ search: e })}
/>
<Link className="text-sm font-semibold" subtle onClick={() => openFeedbackDialog()}>
Can't find what you're looking for?
</Link>
<div className="flex-1" />
<LemonSelect
type="secondary"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { LemonDialog, LemonInput, LemonTextArea, lemonToast } from '@posthog/lemon-ui'
import FuseClass from 'fuse.js'
import { actions, afterMount, connect, kea, path, reducers, selectors } from 'kea'
import { actions, afterMount, connect, kea, listeners, path, reducers, selectors } from 'kea'
import { loaders } from 'kea-loaders'
import { actionToUrl, combineUrl, router, urlToAction } from 'kea-router'
import api from 'lib/api'
import { LemonField } from 'lib/lemon-ui/LemonField'
import { objectsEqual } from 'lib/utils'
import posthog from 'posthog-js'
import { urls } from 'scenes/urls'
import { userLogic } from 'scenes/userLogic'

Expand Down Expand Up @@ -45,6 +48,7 @@ export const newDestinationsLogic = kea<newDestinationsLogicType>([
actions({
setFilters: (filters: Partial<NewDestinationFilters>) => ({ filters }),
resetFilters: true,
openFeedbackDialog: true,
}),
reducers({
filters: [
Expand Down Expand Up @@ -159,6 +163,41 @@ export const newDestinationsLogic = kea<newDestinationsLogicType>([
],
})),

listeners(({ values }) => ({
setFilters: async ({ filters }, breakpoint) => {
if (filters.search && filters.search.length > 2) {
await breakpoint(1000)
posthog.capture('cdp destination search', { search: filters.search })
}
},

openFeedbackDialog: async (_, breakpoint) => {
await breakpoint(100)
LemonDialog.openForm({
title: 'What destination would you like to see?',
initialValues: { destination_name: values.filters.search },
errors: {
destination_name: (x) => (!x ? 'Required' : undefined),
},
description: undefined,
content: (
<div className="space-y-2">
<LemonField name="destination_name" label="Destination">
<LemonInput placeholder="What destination would you like to see?" autoFocus />
</LemonField>
<LemonField name="destination_details" label="Additional information" showOptional>
<LemonTextArea placeholder="Any extra details about what you would need this destination to do or your overall goal" />
</LemonField>
</div>
),
onSubmit: async (values) => {
posthog.capture('cdp destination feedback', { ...values })
lemonToast.success('Thank you for your feedback!')
},
})
},
})),

actionToUrl(({ values }) => {
const urlFromFilters = (): [
string,
Expand Down
8 changes: 2 additions & 6 deletions frontend/src/scenes/surveys/SurveyView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
SurveyType,
} from '~/types'

import { SURVEY_EVENT_NAME } from './constants'
import { SURVEY_EVENT_NAME, SurveyQuestionLabel } from './constants'
import { SurveyDisplaySummary } from './Survey'
import { SurveyAPIEditor } from './SurveyAPIEditor'
import { SurveyFormAppearance } from './SurveyFormAppearance'
Expand Down Expand Up @@ -293,11 +293,7 @@ export function SurveyView({ id }: { id: string }): JSX.Element {
{survey.questions[0].question && (
<>
<span className="card-secondary mt-4">Type</span>
<span>
{survey.questions.length > 1
? 'Multiple questions'
: capitalizeFirstLetter(survey.questions[0].type)}
</span>
<span>{SurveyQuestionLabel[survey.questions[0].type]}</span>
<span className="card-secondary mt-4">
{pluralize(
survey.questions.length,
Expand Down
4 changes: 3 additions & 1 deletion frontend/src/scenes/surveys/surveyActivityDescriber.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ describe('describeQuestionChanges', () => {
)
expect(getTextContent(changes[1])).toBe('made question optional')
expect(getTextContent(changes[2])).toBe('changed button text from "Next" to "Continue"')
expect(getTextContent(changes[3])).toBe('changed question type from single_choice to multiple_choice')
expect(getTextContent(changes[3])).toBe(
'changed question type from Single choice select to Multiple choice select'
)
expect(getTextContent(changes[4])).toBe('added choices: Maybe')
expect(getTextContent(changes[5])).toBe('updated branching logic')
})
Expand Down
5 changes: 4 additions & 1 deletion frontend/src/scenes/surveys/surveyActivityDescriber.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import {
SurveyQuestionType,
} from '~/types'

import { SurveyQuestionLabel } from './constants'

const isEmptyOrUndefined = (value: any): boolean => value === undefined || value === null || value === ''

const nameOrLinkToSurvey = (
Expand Down Expand Up @@ -440,7 +442,8 @@ export function describeQuestionChanges(before: SurveyQuestion, after: SurveyQue
before.type !== after.type
? [
<>
changed question type from <strong>{before.type}</strong> to <strong>{after.type}</strong>
changed question type from <strong>{SurveyQuestionLabel[before.type]}</strong> to{' '}
<strong>{SurveyQuestionLabel[after.type]}</strong>
</>,
]
: []
Expand Down
4 changes: 4 additions & 0 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ posthog/temporal/common/utils.py:0: note: This is likely because "from_activity"
posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type]
posthog/warehouse/models/ssh_tunnel.py:0: error: Incompatible types in assignment (expression has type "NoEncryption", variable has type "BestAvailableEncryption") [assignment]
posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: error: No overload variant of "execute" of "Connection" matches argument types "str", "dict[str, str]" [call-overload]
posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: Possible overload variants:
posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def [_T] execute(self, statement: TypedReturnsRows[_T], parameters: Sequence[Mapping[str, Any]] | Mapping[str, Any] | None = ..., *, execution_options: _CoreKnownExecutionOptions | Mapping[str, Any] | None = ...) -> CursorResult[_T]
posthog/temporal/data_imports/pipelines/sql_database/__init__.py:0: note: def execute(self, statement: Executable, parameters: Sequence[Mapping[str, Any]] | Mapping[str, Any] | None = ..., *, execution_options: _CoreKnownExecutionOptions | Mapping[str, Any] | None = ...) -> CursorResult[Any]
posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Dict entry 2 has incompatible type "Literal['auto']": "None"; expected "Literal['json_response', 'header_link', 'auto', 'single_page', 'cursor', 'offset', 'page_number']": "type[BasePaginator]" [dict-item]
posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "AuthConfigBase") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Argument 1 to "get_auth_class" has incompatible type "Literal['bearer', 'api_key', 'http_basic'] | None"; expected "Literal['bearer', 'api_key', 'http_basic']" [arg-type]
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/cdp/hog-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ export class HogExecutor {
hogFunctionUrl: invocation.globals.source?.url,
}

status.info('🦔', `[HogExecutor] Executing function`, loggingContext)
status.debug('🦔', `[HogExecutor] Executing function`, loggingContext)

const result: HogFunctionInvocationResult = {
...invocation,
Expand Down
15 changes: 14 additions & 1 deletion posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@
ExternalDataSource,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.warehouse.models.external_data_schema import aupdate_should_sync


Non_Retryable_Schema_Errors = [
"NoSuchTableError",
"401 Client Error: Unauthorized for url: https://api.stripe.com",
"403 Client Error: Forbidden for url: https://api.stripe.com",
]


@dataclasses.dataclass
Expand All @@ -54,6 +62,11 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu
f"External data job failed for external data schema {inputs.schema_id} with error: {inputs.internal_error}"
)

has_non_retryable_error = any(error in inputs.internal_error for error in Non_Retryable_Schema_Errors)
if has_non_retryable_error:
logger.info("Schema has a non-retryable error - turning off syncing")
await aupdate_should_sync(schema_id=inputs.schema_id, team_id=inputs.team_id, should_sync=False)

await sync_to_async(update_external_job_status)(
run_id=uuid.UUID(inputs.id),
status=inputs.status,
Expand Down Expand Up @@ -177,7 +190,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs):
await workflow.execute_activity(
import_data_activity,
job_inputs,
heartbeat_timeout=dt.timedelta(minutes=1),
heartbeat_timeout=dt.timedelta(minutes=2),
**timeout_params,
) # type: ignore

Expand Down
15 changes: 2 additions & 13 deletions posthog/temporal/data_imports/workflow_activities/import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from temporalio import activity

from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.data_imports.pipelines.helpers import aremove_reset_pipeline, aupdate_job_count

from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs
Expand All @@ -13,7 +14,6 @@
get_external_data_job,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
import asyncio
from structlog.typing import FilteringBoundLogger
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id
from posthog.warehouse.models.ssh_tunnel import SSHTunnel
Expand Down Expand Up @@ -252,22 +252,11 @@ async def _run(
schema: ExternalDataSchema,
reset_pipeline: bool,
):
# Temp background heartbeat for now
async def heartbeat() -> None:
while True:
await asyncio.sleep(10)
activity.heartbeat()

heartbeat_task = asyncio.create_task(heartbeat())

try:
async with Heartbeater():
table_row_counts = await DataImportPipeline(
job_inputs, source, logger, reset_pipeline, schema.is_incremental
).run()
total_rows_synced = sum(table_row_counts.values())

await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced)
await aremove_reset_pipeline(inputs.source_id)
finally:
heartbeat_task.cancel()
await asyncio.wait([heartbeat_task])
93 changes: 93 additions & 0 deletions posthog/temporal/tests/external_data/test_external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,99 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs
assert schema.status == ExternalDataJob.Status.COMPLETED


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_update_external_job_activity_with_retryable_error(activity_environment, team, **kwargs):
new_source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
)

schema = await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0],
team_id=team.id,
source_id=new_source.pk,
should_sync=True,
)

new_job = await sync_to_async(create_external_data_job)(
team_id=team.id,
external_data_source_id=new_source.pk,
workflow_id=activity_environment.info.workflow_id,
workflow_run_id=activity_environment.info.workflow_run_id,
external_data_schema_id=schema.id,
)

inputs = UpdateExternalDataJobStatusInputs(
id=str(new_job.id),
run_id=str(new_job.id),
status=ExternalDataJob.Status.COMPLETED,
latest_error=None,
internal_error="Some other retryable error",
schema_id=str(schema.pk),
team_id=team.id,
)

await activity_environment.run(update_external_data_job_model, inputs)
await sync_to_async(new_job.refresh_from_db)()
await sync_to_async(schema.refresh_from_db)()

assert new_job.status == ExternalDataJob.Status.COMPLETED
assert schema.status == ExternalDataJob.Status.COMPLETED
assert schema.should_sync is True


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_update_external_job_activity_with_non_retryable_error(activity_environment, team, **kwargs):
new_source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
)

schema = await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0],
team_id=team.id,
source_id=new_source.pk,
should_sync=True,
)

new_job = await sync_to_async(create_external_data_job)(
team_id=team.id,
external_data_source_id=new_source.pk,
workflow_id=activity_environment.info.workflow_id,
workflow_run_id=activity_environment.info.workflow_run_id,
external_data_schema_id=schema.id,
)

inputs = UpdateExternalDataJobStatusInputs(
id=str(new_job.id),
run_id=str(new_job.id),
status=ExternalDataJob.Status.COMPLETED,
latest_error=None,
internal_error="NoSuchTableError: TableA",
schema_id=str(schema.pk),
team_id=team.id,
)
with mock.patch("posthog.warehouse.models.external_data_schema.external_data_workflow_exists", return_value=False):
await activity_environment.run(update_external_data_job_model, inputs)

await sync_to_async(new_job.refresh_from_db)()
await sync_to_async(schema.refresh_from_db)()

assert new_job.status == ExternalDataJob.Status.COMPLETED
assert schema.status == ExternalDataJob.Status.COMPLETED
assert schema.should_sync is False


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_run_stripe_job(activity_environment, team, minio_client, **kwargs):
Expand Down
Loading

0 comments on commit cfddab8

Please sign in to comment.