Skip to content

Commit

Permalink
typing
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE committed Jan 25, 2024
1 parent e556d48 commit 61586a6
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def create_external_data_job_model(inputs: CreateExternalDataJobInputs) ->
schemas_to_sync = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source.source_type])

await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore
schemas_to_sync, # type: ignore
schemas_to_sync,
source_id=inputs.external_data_source_id,
team_id=inputs.team_id,
)
Expand Down
4 changes: 2 additions & 2 deletions posthog/temporal/data_imports/pipelines/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy.engine import Engine

import dlt
from dlt.sources import DltResource
from dlt.sources import DltResource, DltSource


from dlt.sources.credentials import ConnectionStringCredentials
Expand All @@ -21,7 +21,7 @@

def postgres_source(
host: str, port: int, user: str, password: str, database: str, sslmode: str, schema: str, table_names: list[str]
) -> DltResource:
) -> DltSource:
credentials = ConnectionStringCredentials(
f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode={sslmode}"
)
Expand Down
4 changes: 2 additions & 2 deletions posthog/temporal/data_imports/pipelines/postgres/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dlt.common.typing import TDataItem
from .settings import DEFAULT_CHUNK_SIZE

from sqlalchemy import Table, create_engine
from sqlalchemy import Table, create_engine, Column
from sqlalchemy.engine import Engine
from sqlalchemy.sql import Select

Expand All @@ -34,7 +34,7 @@ def __init__(
self.incremental = incremental
if incremental:
try:
self.cursor_column = table.c[incremental.cursor_path]
self.cursor_column: Optional[Column[Any]] = table.c[incremental.cursor_path]
except KeyError as e:
raise KeyError(
f"Cursor column '{incremental.cursor_path}' does not exist in table '{table.name}'"
Expand Down
2 changes: 1 addition & 1 deletion posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
raise NotImplementedError(f"Source type {source_type} not implemented")

if source_type == ExternalDataSource.Type.POSTGRES:
schemas = table_names
schemas = tuple(table_names)
else:
schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source_type]

Expand Down
4 changes: 2 additions & 2 deletions posthog/warehouse/models/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ def sync_old_schemas_with_new_schemas(new_schemas: list, source_id: uuid.UUID, t
ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id, should_sync=False)


def get_postgres_schemas(host: str, port: int, database: str, user: str, password: str, sslmode: str, schema: str):
def get_postgres_schemas(host: str, port: str, database: str, user: str, password: str, sslmode: str, schema: str):
connection = psycopg.Connection.connect(
host=host,
port=port,
port=int(port),
dbname=database,
user=user,
password=password,
Expand Down

0 comments on commit 61586a6

Please sign in to comment.