Skip to content

Commit

Permalink
fix tests and imports
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE committed Dec 20, 2023
1 parent 1bb4af7 commit d94ffa6
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 17 deletions.
10 changes: 8 additions & 2 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from posthog.temporal.batch_exports.base import PostHogWorkflow

from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table
from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING
from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs
from posthog.warehouse.external_data_source.jobs import (
create_external_data_job,
Expand Down Expand Up @@ -141,13 +142,18 @@ async def run_external_data_job(inputs: ExternalDataJobInputs) -> None:
dataset_name=model.folder_path,
)

if model.pipeline.source_type == 'stripe':
source = None
if model.pipeline.source_type == ExternalDataSource.Type.STRIPE:
from posthog.temporal.data_imports.pipelines.stripe.helpers import stripe_source
stripe_secret_key = model.pipeline.job_inputs.get('stripe_secret_key', None)

stripe_secret_key = model.pipeline.job_inputs.get("stripe_secret_key", None)
if not stripe_secret_key:
raise ValueError(f"Stripe secret key not found for job {model.id}")
source = stripe_source(api_key=stripe_secret_key, endpoints=inputs.schemas)

if not source:
raise ValueError(f"Source not found for job {model.id}")

await DataImportPipeline(job_inputs, source, logger).run()


Expand Down
21 changes: 11 additions & 10 deletions posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from structlog.typing import FilteringBoundLogger
from dlt.sources import DltResource


@dataclass
class PipelineInputs:
source_id: UUID
Expand Down Expand Up @@ -40,14 +41,14 @@ def _get_destination(self):
credentials = {
"aws_access_key_id": settings.AIRBYTE_BUCKET_KEY,
"aws_secret_access_key": settings.AIRBYTE_BUCKET_SECRET,
"endpoint_url": settings.OBJECT_STORAGE_ENDPOINT
"endpoint_url": settings.OBJECT_STORAGE_ENDPOINT,
}
else:
credentials = {
"aws_access_key_id": settings.AIRBYTE_BUCKET_KEY,
"aws_secret_access_key": settings.AIRBYTE_BUCKET_SECRET,
}

credentials = {
"aws_access_key_id": settings.AIRBYTE_BUCKET_KEY,
"aws_secret_access_key": settings.AIRBYTE_BUCKET_SECRET,
}


return dlt.destinations.filesystem(
credentials=credentials,
bucket_url=settings.BUCKET_URL, # type: ignore
Expand All @@ -69,9 +70,9 @@ def _get_schemas(self):
if not self.inputs.schemas:
self.logger.info(f"No schemas found for source id {self.inputs.source_id}")
return None

return self.inputs.schemas

def _run(self):
pipeline = self._create_pipeline()
pipeline.run(self.source, loader_file_format=self.loader_file_format)
Expand All @@ -85,4 +86,4 @@ async def run(self) -> None:
await asyncio.to_thread(self._run)
except PipelineStepFailed:
self.logger.error(f"Data import failed for endpoint")
raise
raise
4 changes: 4 additions & 0 deletions posthog/temporal/data_imports/pipelines/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from posthog.warehouse.models import ExternalDataSource
from posthog.temporal.data_imports.pipelines.stripe.settings import ENDPOINTS

PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {ExternalDataSource.Type.STRIPE: ENDPOINTS}
6 changes: 3 additions & 3 deletions posthog/temporal/tests/test_external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
ExternalDataSchema,
)

from posthog.temporal.data_imports.pipelines.stripe.stripe_pipeline import (
PIPELINE_TYPE_RUN_MAPPING,
from posthog.temporal.data_imports.pipelines.schemas import (
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING,
)
from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline
from temporalio.testing import WorkflowEnvironment
from temporalio.common import RetryPolicy
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
Expand Down Expand Up @@ -449,7 +449,7 @@ async def mock_async_func(inputs):

with mock.patch(
"posthog.warehouse.models.table.DataWarehouseTable.get_columns", return_value={"id": "string"}
), mock.patch.dict(PIPELINE_TYPE_RUN_MAPPING, {ExternalDataSource.Type.STRIPE: mock_async_func}):
), mock.patch.object(DataImportPipeline, "run", mock_async_func):
with override_settings(AIRBYTE_BUCKET_KEY="test-key", AIRBYTE_BUCKET_SECRET="test-secret"):
async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
Expand Down
2 changes: 1 addition & 1 deletion posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from posthog.warehouse.models import ExternalDataSource, ExternalDataSchema, ExternalDataJob
from posthog.warehouse.api.external_data_schema import ExternalDataSchemaSerializer
from posthog.temporal.data_imports.pipelines.stripe.stripe_pipeline import (
from posthog.temporal.data_imports.pipelines.schemas import (
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING,
)

Expand Down
2 changes: 1 addition & 1 deletion posthog/warehouse/api/test/test_external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from posthog.warehouse.models import ExternalDataSource, ExternalDataSchema
import uuid
from unittest.mock import patch
from posthog.temporal.data_imports.pipelines.stripe.stripe_pipeline import (
from posthog.temporal.data_imports.pipelines.schemas import (
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING,
)

Expand Down

0 comments on commit d94ffa6

Please sign in to comment.