Skip to content

Commit

Permalink
feat(data-warehouse): added sql source v2 (#25858)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Michael Matloka <[email protected]>
  • Loading branch information
3 people authored Oct 31, 2024
1 parent 8b4387c commit 5647c55
Show file tree
Hide file tree
Showing 16 changed files with 1,124 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
e."$group_0" as aggregation_target,
if(notEmpty(pdi.distinct_id), pdi.person_id, e.person_id) as person_id,
person.person_props as person_props,
person.pmat_email as pmat_email,
if(event = 'step one', 1, 0) as step_0,
if(step_0 = 1, timestamp, null) as latest_0,
if(event = 'step two', 1, 0) as step_1,
Expand All @@ -79,6 +80,7 @@
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
INNER JOIN
(SELECT id,
argMax(pmat_email, version) as pmat_email,
argMax(properties, version) as person_props
FROM person
WHERE team_id = 99999
Expand All @@ -95,7 +97,7 @@
AND event IN ['step one', 'step three', 'step two']
AND toTimeZone(timestamp, 'UTC') >= toDateTime('2021-05-01 00:00:00', 'UTC')
AND toTimeZone(timestamp, 'UTC') <= toDateTime('2021-05-10 23:59:59', 'UTC')
AND ((replaceRegexpAll(JSONExtractRaw(person_props, 'email'), '^"|"$', '') ILIKE '%g0%'
AND (("pmat_email" ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(person_props, 'name'), '^"|"$', '') ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(e.properties, 'distinct_id'), '^"|"$', '') ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(group_properties_0, 'name'), '^"|"$', '') ILIKE '%g0%'
Expand Down
39 changes: 36 additions & 3 deletions mypy-baseline.txt

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def _create_pipeline(self):
pipeline_name = self._get_pipeline_name()
destination = self._get_destination()

dlt.config["normalize.parquet_normalizer.add_dlt_load_id"] = True
dlt.config["normalize.parquet_normalizer.add_dlt_id"] = True

return dlt.pipeline(
pipeline_name=pipeline_name, destination=destination, dataset_name=self.inputs.dataset_name, progress="log"
)
Expand Down
19 changes: 0 additions & 19 deletions posthog/temporal/data_imports/pipelines/rest_source/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,22 +372,3 @@ def identity_func(x: Any) -> Any:
if incremental_param.end:
params[incremental_param.end] = transform(incremental_object.end_value)
return params


# XXX: This is a workaround pass test_dlt_init.py
# since the source uses dlt.source as a function
def _register_source(source_func: Callable[..., DltSource]) -> None:
import inspect
from dlt.common.configuration import get_fun_spec
from dlt.common.source import _SOURCES, SourceInfo

spec = get_fun_spec(source_func)
func_module = inspect.getmodule(source_func)
_SOURCES[source_func.__name__] = SourceInfo(
SPEC=spec,
f=source_func,
module=func_module,
)


_register_source(rest_api_source)
407 changes: 407 additions & 0 deletions posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions posthog/temporal/data_imports/pipelines/sql_database_v2/_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import json
from sqlalchemy import String, TypeDecorator


class BigQueryJSON(TypeDecorator):
"""
SQLAlchemy 2.0 compatible JSON type for BigQuery
This implementation uses STRING as the underlying type since
that's how BigQuery stores JSON.
"""

impl = String
cache_ok = True

def __init__(self, none_as_null: bool = False) -> None:
super().__init__()
self.none_as_null = none_as_null

# Add these for BigQuery dialect compatibility
self._json_serializer = json.dumps
self._json_deserializer = json.loads

def process_bind_param(self, value, dialect):
if value is None:
return None if self.none_as_null else "null"
return self._json_deserializer(value)

def process_result_value(self, value, dialect):
if value is None:
return None
return self._json_serializer(value)
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from typing import Any, Optional
from collections.abc import Sequence

from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common import logger, json
from dlt.common.configuration import with_config
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.json import custom_encode, map_nested_in_place

from .schema_types import RowAny


@with_config
def columns_to_arrow(
columns_schema: TTableSchemaColumns,
caps: Optional[DestinationCapabilitiesContext] = None,
tz: str = "UTC",
) -> Any:
"""Converts `column_schema` to arrow schema using `caps` and `tz`. `caps` are injected from the container - which
is always the case if run within the pipeline. This will generate arrow schema compatible with the destination.
Otherwise generic capabilities are used
"""
from dlt.common.libs.pyarrow import pyarrow as pa, get_py_arrow_datatype
from dlt.common.destination.capabilities import DestinationCapabilitiesContext

return pa.schema(
[
pa.field(
name,
get_py_arrow_datatype(
schema_item,
caps or DestinationCapabilitiesContext.generic_capabilities(),
tz,
),
nullable=schema_item.get("nullable", True),
)
for name, schema_item in columns_schema.items()
if schema_item.get("data_type") is not None
]
)


def row_tuples_to_arrow(rows: Sequence[RowAny], columns: TTableSchemaColumns, tz: str) -> Any:
"""Converts the rows to an arrow table using the columns schema.
Columns missing `data_type` will be inferred from the row data.
Columns with object types not supported by arrow are excluded from the resulting table.
"""
from dlt.common.libs.pyarrow import pyarrow as pa
import numpy as np

try:
from pandas._libs import lib

pivoted_rows = lib.to_object_array_tuples(rows).T # type: ignore[attr-defined]
except ImportError:
logger.info("Pandas not installed, reverting to numpy.asarray to create a table which is slower")
pivoted_rows = np.asarray(rows, dtype="object", order="k").T # type: ignore[call-overload]

columnar = {col: dat.ravel() for col, dat in zip(columns, np.vsplit(pivoted_rows, len(columns)))}
columnar_known_types = {
col["name"]: columnar[col["name"]] for col in columns.values() if col.get("data_type") is not None
}
columnar_unknown_types = {
col["name"]: columnar[col["name"]] for col in columns.values() if col.get("data_type") is None
}

arrow_schema = columns_to_arrow(columns, tz=tz)

for idx in range(0, len(arrow_schema.names)):
field = arrow_schema.field(idx)
py_type: type = type(None)
for row in rows:
val = row[idx]
if val is not None:
py_type = type(val)
break

# cast double / float ndarrays to decimals if type mismatch, looks like decimals and floats are often mixed up in dialects
if pa.types.is_decimal(field.type) and issubclass(py_type, str | float):
logger.warning(
f"Field {field.name} was reflected as decimal type, but rows contains {py_type.__name__}. Additional cast is required which may slow down arrow table generation."
)
float_array = pa.array(columnar_known_types[field.name], type=pa.float64())
columnar_known_types[field.name] = float_array.cast(field.type, safe=False)
if issubclass(py_type, dict | list):
logger.warning(
f"Field {field.name} was reflected as JSON type and needs to be serialized back to string to be placed in arrow table. This will slow data extraction down. You should cast JSON field to STRING in your database system ie. by creating and extracting an SQL VIEW that selects with cast."
)
json_str_array = pa.array([None if s is None else json.dumps(s) for s in columnar_known_types[field.name]])
columnar_known_types[field.name] = json_str_array

# If there are unknown type columns, first create a table to infer their types
if columnar_unknown_types:
new_schema_fields = []
for key in list(columnar_unknown_types):
arrow_col: Optional[pa.Array] = None
try:
arrow_col = pa.array(columnar_unknown_types[key])
if pa.types.is_null(arrow_col.type):
logger.warning(
f"Column {key} contains only NULL values and data type could not be inferred. This column is removed from a arrow table"
)
continue

except pa.ArrowInvalid as e:
# Try coercing types not supported by arrow to a json friendly format
# E.g. dataclasses -> dict, UUID -> str
try:
arrow_col = pa.array(map_nested_in_place(custom_encode, list(columnar_unknown_types[key])))
logger.warning(
f"Column {key} contains a data type which is not supported by pyarrow and got converted into {arrow_col.type}. This slows down arrow table generation."
)
except (pa.ArrowInvalid, TypeError):
logger.warning(
f"Column {key} contains a data type which is not supported by pyarrow. This column will be ignored. Error: {e}"
)
if arrow_col is not None:
columnar_known_types[key] = arrow_col
new_schema_fields.append(
pa.field(
key,
arrow_col.type,
nullable=columns[key]["nullable"],
)
)

# New schema
column_order = {name: idx for idx, name in enumerate(columns)}
arrow_schema = pa.schema(
sorted(
list(arrow_schema) + new_schema_fields,
key=lambda x: column_order[x.name],
)
)

return pa.Table.from_pydict(columnar_known_types, schema=arrow_schema)
Loading

0 comments on commit 5647c55

Please sign in to comment.