Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-warehouse): added sql source v2 #25858

Merged
merged 23 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
e."$group_0" as aggregation_target,
if(notEmpty(pdi.distinct_id), pdi.person_id, e.person_id) as person_id,
person.person_props as person_props,
person.pmat_email as pmat_email,
if(event = 'step one', 1, 0) as step_0,
if(step_0 = 1, timestamp, null) as latest_0,
if(event = 'step two', 1, 0) as step_1,
Expand All @@ -79,6 +80,7 @@
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
INNER JOIN
(SELECT id,
argMax(pmat_email, version) as pmat_email,
argMax(properties, version) as person_props
FROM person
WHERE team_id = 99999
Expand All @@ -95,7 +97,7 @@
AND event IN ['step one', 'step three', 'step two']
AND toTimeZone(timestamp, 'UTC') >= toDateTime('2021-05-01 00:00:00', 'UTC')
AND toTimeZone(timestamp, 'UTC') <= toDateTime('2021-05-10 23:59:59', 'UTC')
AND ((replaceRegexpAll(JSONExtractRaw(person_props, 'email'), '^"|"$', '') ILIKE '%g0%'
AND (("pmat_email" ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(person_props, 'name'), '^"|"$', '') ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(e.properties, 'distinct_id'), '^"|"$', '') ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(group_properties_0, 'name'), '^"|"$', '') ILIKE '%g0%'
Expand Down
39 changes: 36 additions & 3 deletions mypy-baseline.txt

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def _create_pipeline(self):
pipeline_name = self._get_pipeline_name()
destination = self._get_destination()

dlt.config["normalize.parquet_normalizer.add_dlt_load_id"] = True
dlt.config["normalize.parquet_normalizer.add_dlt_id"] = True

return dlt.pipeline(
pipeline_name=pipeline_name, destination=destination, dataset_name=self.inputs.dataset_name, progress="log"
)
Expand Down
19 changes: 0 additions & 19 deletions posthog/temporal/data_imports/pipelines/rest_source/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,22 +372,3 @@ def identity_func(x: Any) -> Any:
if incremental_param.end:
params[incremental_param.end] = transform(incremental_object.end_value)
return params


# XXX: This is a workaround pass test_dlt_init.py
# since the source uses dlt.source as a function
def _register_source(source_func: Callable[..., DltSource]) -> None:
import inspect
from dlt.common.configuration import get_fun_spec
from dlt.common.source import _SOURCES, SourceInfo

spec = get_fun_spec(source_func)
func_module = inspect.getmodule(source_func)
_SOURCES[source_func.__name__] = SourceInfo(
SPEC=spec,
f=source_func,
module=func_module,
)


_register_source(rest_api_source)
407 changes: 407 additions & 0 deletions posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions posthog/temporal/data_imports/pipelines/sql_database_v2/_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import json
from sqlalchemy import String, TypeDecorator


class BigQueryJSON(TypeDecorator):
"""
SQLAlchemy 2.0 compatible JSON type for BigQuery

This implementation uses STRING as the underlying type since
that's how BigQuery stores JSON.
"""

impl = String
cache_ok = True

def __init__(self, none_as_null: bool = False) -> None:
super().__init__()
self.none_as_null = none_as_null

# Add these for BigQuery dialect compatibility
self._json_serializer = json.dumps
self._json_deserializer = json.loads

def process_bind_param(self, value, dialect):
if value is None:
return None if self.none_as_null else "null"
return self._json_deserializer(value)

def process_result_value(self, value, dialect):
if value is None:
return None
return self._json_serializer(value)
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from typing import Any, Optional
from collections.abc import Sequence

from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common import logger, json
from dlt.common.configuration import with_config
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.json import custom_encode, map_nested_in_place

from .schema_types import RowAny


@with_config
def columns_to_arrow(
columns_schema: TTableSchemaColumns,
caps: Optional[DestinationCapabilitiesContext] = None,
tz: str = "UTC",
) -> Any:
"""Converts `column_schema` to arrow schema using `caps` and `tz`. `caps` are injected from the container - which
is always the case if run within the pipeline. This will generate arrow schema compatible with the destination.
Otherwise generic capabilities are used
"""
from dlt.common.libs.pyarrow import pyarrow as pa, get_py_arrow_datatype
from dlt.common.destination.capabilities import DestinationCapabilitiesContext

return pa.schema(
[
pa.field(
name,
get_py_arrow_datatype(
schema_item,
caps or DestinationCapabilitiesContext.generic_capabilities(),
tz,
),
nullable=schema_item.get("nullable", True),
)
for name, schema_item in columns_schema.items()
if schema_item.get("data_type") is not None
]
)


def row_tuples_to_arrow(rows: Sequence[RowAny], columns: TTableSchemaColumns, tz: str) -> Any:
"""Converts the rows to an arrow table using the columns schema.
Columns missing `data_type` will be inferred from the row data.
Columns with object types not supported by arrow are excluded from the resulting table.
"""
from dlt.common.libs.pyarrow import pyarrow as pa
import numpy as np

try:
from pandas._libs import lib

pivoted_rows = lib.to_object_array_tuples(rows).T # type: ignore[attr-defined]
except ImportError:
logger.info("Pandas not installed, reverting to numpy.asarray to create a table which is slower")
pivoted_rows = np.asarray(rows, dtype="object", order="k").T # type: ignore[call-overload]

columnar = {col: dat.ravel() for col, dat in zip(columns, np.vsplit(pivoted_rows, len(columns)))}
columnar_known_types = {
col["name"]: columnar[col["name"]] for col in columns.values() if col.get("data_type") is not None
}
columnar_unknown_types = {
col["name"]: columnar[col["name"]] for col in columns.values() if col.get("data_type") is None
}

arrow_schema = columns_to_arrow(columns, tz=tz)

for idx in range(0, len(arrow_schema.names)):
field = arrow_schema.field(idx)
py_type: type = type(None)
for row in rows:
val = row[idx]
if val is not None:
py_type = type(val)
break

# cast double / float ndarrays to decimals if type mismatch, looks like decimals and floats are often mixed up in dialects
if pa.types.is_decimal(field.type) and issubclass(py_type, str | float):
logger.warning(
f"Field {field.name} was reflected as decimal type, but rows contains {py_type.__name__}. Additional cast is required which may slow down arrow table generation."
)
float_array = pa.array(columnar_known_types[field.name], type=pa.float64())
columnar_known_types[field.name] = float_array.cast(field.type, safe=False)
if issubclass(py_type, dict | list):
logger.warning(
f"Field {field.name} was reflected as JSON type and needs to be serialized back to string to be placed in arrow table. This will slow data extraction down. You should cast JSON field to STRING in your database system ie. by creating and extracting an SQL VIEW that selects with cast."
)
json_str_array = pa.array([None if s is None else json.dumps(s) for s in columnar_known_types[field.name]])
columnar_known_types[field.name] = json_str_array

# If there are unknown type columns, first create a table to infer their types
if columnar_unknown_types:
new_schema_fields = []
for key in list(columnar_unknown_types):
arrow_col: Optional[pa.Array] = None
try:
arrow_col = pa.array(columnar_unknown_types[key])
if pa.types.is_null(arrow_col.type):
logger.warning(
f"Column {key} contains only NULL values and data type could not be inferred. This column is removed from a arrow table"
)
continue

except pa.ArrowInvalid as e:
# Try coercing types not supported by arrow to a json friendly format
# E.g. dataclasses -> dict, UUID -> str
try:
arrow_col = pa.array(map_nested_in_place(custom_encode, list(columnar_unknown_types[key])))
logger.warning(
f"Column {key} contains a data type which is not supported by pyarrow and got converted into {arrow_col.type}. This slows down arrow table generation."
)
except (pa.ArrowInvalid, TypeError):
logger.warning(
f"Column {key} contains a data type which is not supported by pyarrow. This column will be ignored. Error: {e}"
)
if arrow_col is not None:
columnar_known_types[key] = arrow_col
new_schema_fields.append(
pa.field(
key,
arrow_col.type,
nullable=columns[key]["nullable"],
)
)

# New schema
column_order = {name: idx for idx, name in enumerate(columns)}
arrow_schema = pa.schema(
sorted(
list(arrow_schema) + new_schema_fields,
key=lambda x: column_order[x.name],
)
)

return pa.Table.from_pydict(columnar_known_types, schema=arrow_schema)
Loading
Loading