Skip to content

Commit

Permalink
Add Zyp Transformations, a minimal transformation engine
Browse files Browse the repository at this point in the history
A data model and implementation for a compact transformation engine written
in Python.

- Based on JSON Pointer (RFC 6901), JMESPath, and transon
- Implemented using `attrs` and `cattrs`
- Includes built-in transformation functions `to_datetime` and
  `to_unixtime`
- Ability to marshal and unmarshal its representation to/from JSON and YAML
  • Loading branch information
amotl committed Aug 11, 2024
1 parent e9f17e9 commit 98cac09
Show file tree
Hide file tree
Showing 20 changed files with 1,471 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[develop,test,mongodb]
pip install --use-pep517 --prefer-binary --editable=.[all,develop,test]
- name: Run linters and software tests
run: poe check
Expand Down
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- Added `BucketTransformation`, a minimal transformation engine
based on JSON Pointer (RFC 6901).

## 2024/08/05 v0.0.3
- Added transformer for AWS DMS to CrateDB SQL
Expand Down
3 changes: 3 additions & 0 deletions docs/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@
- [ ] MongoDB: Implement stream resumption using `start_after`
- [ ] Feature: Filter by events, e.g. Ignore "delete" events?
- [ ] Integration Testing the "example" programs?
- [ ] Improve capabilities of DMS translator
https://github.com/daq-tools/commons-codec/issues/11
- https://github.com/supabase/pg_replicate
46 changes: 46 additions & 0 deletions docs/zyp/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Zyp Backlog

## Iteration +1
- Refactor module namespace to `zyp`
- Documentation
- CLI interface
- Apply to MongoDB Table Loader in CrateDB Toolkit

## Iteration +2
Demonstrate!
- math expressions
- omit key (recursively)
- combine keys
- filter on keys and/or values
- Pathological cases like "Not defined" in typed fields like `TIMESTAMP`
- Use simpleeval, like Meltano, and provide the same built-in functions
- https://sdk.meltano.com/en/v0.39.1/stream_maps.html#other-built-in-functions-and-names
- https://github.com/MeltanoLabs/meltano-map-transform/pull/255
- https://github.com/MeltanoLabs/meltano-map-transform/issues/252
- Use JSONPath, see https://sdk.meltano.com/en/v0.39.1/code_samples.html#use-a-jsonpath-expression-to-extract-the-next-page-url-from-a-hateoas-response

## Iteration +3
- Moksha transformations on Buckets
- Investigate using JSON Schema
- Fluent API interface
- https://github.com/Halvani/alphabetic
- Mappers do not support external API lookups.
To add external API lookups, you can either (a) land all your data and
then joins using a transformation tool like dbt, or (b) create a custom
mapper plugin with inline lookup logic.
=> Example from Luftdatenpumpe, using a reverse geocoder
- [ ] Define schema
https://sdk.meltano.com/en/latest/typing.html
- https://docs.meltano.com/guide/v2-migration/#migrate-to-an-adapter-specific-dbt-transformer
- https://github.com/meltano/sdk/blob/v0.39.1/singer_sdk/mapper.py

## Fluent API Interface
```python
from commons_codec.transform.model import FluentTransformation

transformation = FluentTransformation() \
.jmes("records[?starts_with(location, 'B')]") \
.rename_fields({"_id": "id"}) \
.convert_values({"/id": "int", "/value": "float"}, type="pointer-python") \
.jq(".[] |= (.value /= 100)")
```
182 changes: 182 additions & 0 deletions docs/zyp/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Zyp Transformations

## About
A data model and implementation for a compact transformation engine written
in [Python], based on [JSON Pointer] (RFC 6901), [JMESPath], and [transon],
implemented using [attrs] and [cattrs].

## Ideas
:Conciseness:
Define a multistep data refinement process with as little code as possible.
:Low Footprint:
Doesn't need any infrastructure or pipeline framework. It's just a little library.
:Interoperability:
Marshal transformation recipe definition to/from text-only representations (JSON,
YAML), in order to encourage implementations in other languages.
:Performance:
Well, it is written in Python. Fragments can be re-written in Rust, when applicable.
:Immediate:
Other ETL frameworks and concepts often need to first land your data in the target
system before applying subsequent transformations. Zyp is working directly within
the data pipeline, before data is inserted into the target system.

## Synopsis I
A basic transformation example for individual data records.
```python
from commons_codec.transform.model import *

# Consider a slightly messy collection of records.
data_in = [
{"_id": "123", "name": "device-foo", "reading": "42.42"},
{"_id": "456", "name": "device-bar", "reading": -84.01},
]

# Define a transformation that renames the `_id` field to `id`,
# casts its value to `int`, and casts the `reading` field to `float`.
transformation = BucketTransformation(
names=FieldRenamer().add(old="_id", new="id"),
values=ValueConverter()
.add(pointer="/id", transformer="builtins.int")
.add(pointer="/reading", transformer="builtins.float"),
)

for record in data_in:
print(transformation.apply(record))
```
The result is a transformed data collection.
```json
[
{"id": 123, "name": "device-foo", "reading": 42.42},
{"id": 456, "name": "device-bar", "reading": -84.01}
]
```

## Synopsis II
A more advanced transformation example for a collection of data records.

Consider a messy collection of input data.
- The actual collection is nested within the top-level `records` item.
- `_id` fields are conveyed in string format.
- `value` fields include both integer and string values.
- `value` fields are fixed-point values, using a scaling factor of `100`.
- The collection includes invalid `null` records.
Those records usually trip processing when, for example, filtering on object items.
```python
data_in = {
"message-source": "system-3000",
"message-type": "eai-warehouse",
"records": [
{"_id": "12", "meta": {"name": "foo", "location": "B"}, "data": {"value": "4242"}},
None,
{"_id": "34", "meta": {"name": "bar", "location": "BY"}, "data": {"value": -8401}},
{"_id": "56", "meta": {"name": "baz", "location": "NI"}, "data": {"value": 2323}},
{"_id": "78", "meta": {"name": "qux", "location": "NRW"}, "data": {"value": -580}},
None,
None,
],
}
```

Consider after applying a corresponding transformation, the expected outcome is a
collection of valid records, optionally filtered, and values adjusted according
to relevant type hints and other conversions.
```python
data_out = [
{"id": 12, "meta": {"name": "foo", "location": "B"}, "data": {"value": 42.42}},
{"id": 34, "meta": {"name": "bar", "location": "BY"}, "data": {"value": -84.01}},
]
```

Let's come up with relevant pre-processing rules to cleanse and mangle the shape of the
input collection. In order to make this example more exciting, let's include two special
needs:
- Filter input collection by value of nested element.
- Rename top-level fields starting with underscore `_`.

Other than those special rules, the fundamental ones to re-shape the data are:
- Unwrap `records` attribute from container dictionary into actual collection.
- Filter collection, both by omitting invalid/empty records, and by applying query
constrains.
- On each record, rename the top-level `_id` field to `id`.
- On each record, adjust the data types of the `id` and `value` fields.
- Postprocess collection, applying a custom scaling factor to the `value` field.

Zyp let's you concisely write those rules down, using the Python language.
```python
from commons_codec.transform.model import *

transformation = CollectionTransformation(
pre=MokshaTransformation().jmes("records[?not_null(meta.location) && !starts_with(meta.location, 'N')]"),
bucket=BucketTransformation(
names=FieldRenamer().add(old="_id", new="id"),
values=ValueConverter()
.add(pointer="/id", transformer="builtins.int")
.add(pointer="/data/value", transformer="builtins.float"),
),
post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"),
)

data_out = transformation.apply(data_in)
```
Alternatively, serialize the `zyp-collection` transformation description,
for example into YAML format.
```python
print(transformation.to_yaml())
```
```yaml
meta:
version: 1
type: zyp-collection
pre:
rules:
- expression: records[?not_null(meta.location) && !starts_with(meta.location, 'N')]
type: jmes
bucket:
names:
rules:
- new: id
old: _id
values:
rules:
- args: []
pointer: /id
transformer: builtins.int
- args: []
pointer: /data/value
transformer: builtins.float
post:
rules:
- expression: .[] |= (.data.value /= 100)
type: jq
```
## Prior Art
- [Singer Transformer]
- [PipelineWise Transformations]
- [singer-transform]
- [Meltano Inline Data Mapping]
- [Meltano Inline Stream Maps]
- [AWS DMS source filter rules]
- [AWS DMS table selection and transformation rules]
- ... and many more. Thanks for the inspirations.
## Etymology
With kudos to [Kris Zyp] for conceiving [JSON Pointer].
[attrs]: https://www.attrs.org/
[AWS DMS source filter rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.Filters.html
[AWS DMS table selection and transformation rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.html
[cattrs]: https://catt.rs/
[Kris Zyp]: https://github.com/kriszyp
[JMESPath]: https://jmespath.org/
[JSON Pointer]: https://datatracker.ietf.org/doc/html/rfc6901
[Meltano Inline Data Mapping]: https://docs.meltano.com/guide/mappers/
[Meltano Inline Stream Maps]: https://sdk.meltano.com/en/latest/stream_maps.html
[PipelineWise Transformations]: https://transferwise.github.io/pipelinewise/user_guide/transformations.html
[Python]: https://en.wikipedia.org/wiki/Python_(programming_language)
[Singer Transformer]: https://github.com/singer-io/singer-python/blob/master/singer/transform.py
[singer-transform]: https://github.com/dkarzon/singer-transform
[transon]: https://transon-org.github.io/
24 changes: 24 additions & 0 deletions docs/zyp/research.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Transformer Ingredients Research

## Toolbox
- jq, jsonpointer, jmespath, funcy, morph, boltons, toolz
- json-spec, jdata, jolt, json-document-transforms, transon


## Resources
- https://pypi.org/project/json-spec/
- https://pypi.org/project/transon/
- https://pypi.org/project/jdata/
- https://github.com/microsoft/json-document-transforms
- https://github.com/Microsoft/json-document-transforms/wiki
- https://github.com/bazaarvoice/jolt
- https://stackoverflow.com/questions/76303733/exploring-jolt-functions-for-json-to-json-transformations-an-overview
- https://github.com/microsoft/JsonToJsonMapper
- https://pypi.org/project/jdt/
- https://github.com/videntity/json-data-tools
- https://github.com/datavis-tech/json-templates
- https://github.com/google/jsonnet
- https://github.com/jsonata-js/jsonata
- https://github.com/pacifica/python-jsonpath2
- https://github.com/reagento/adaptix
- https://blog.panoply.io/best-data-transformation-tools
18 changes: 16 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ keywords = [
"encode",
"i/o",
"json",
"jsonpointer",
"luftdaten.info",
"map data",
"marshall",
"marshal",
"mongodb",
"nested data",
"sensor.community",
Expand All @@ -37,7 +38,7 @@ keywords = [
"translate",
"ttn",
"tts",
"unmarshall",
"unmarshal",
"unserialize",
"utility",
]
Expand Down Expand Up @@ -102,9 +103,13 @@ dynamic = [
dependencies = [
"attrs<25",
"backports-strenum<1.3; python_version<'3.11'",
"cattrs<24",
"simplejson<4",
"toolz<0.13",
]
optional-dependencies.all = [
"commons-codec[mongodb,zyp]",
]
optional-dependencies.develop = [
"mypy<1.12",
"poethepoet<0.28",
Expand All @@ -124,6 +129,14 @@ optional-dependencies.test = [
"pytest-cov<6",
"pytest-mock<4",
]
optional-dependencies.zyp = [
"jmespath<1.1",
"jq<1.8",
"jsonpointer<4",
"python-dateutil<2.10",
"pyyaml<7",
"transon==0.0.7",
]

urls.Changelog = "https://github.com/daq-tools/commons-codec/blob/main/CHANGES.md"
urls.Documentation = "https://github.com/daq-tools/commons-codec/tree/main/docs"
Expand Down Expand Up @@ -229,6 +242,7 @@ source = [
branch = false
omit = [
"tests/*",
"src/commons_codec/vendor/so/dict.py",
]

[tool.coverage.report]
Expand Down
2 changes: 1 addition & 1 deletion src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
if sys.version_info >= (3, 11):
from enum import StrEnum
else:
from backports.strenum import StrEnum
from backports.strenum import StrEnum # pragma: no cover

from attrs import define

Expand Down
35 changes: 35 additions & 0 deletions src/commons_codec/transform/function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import datetime as dt
import logging
import typing as t

import dateutil.parser

logger = logging.getLogger(__name__)


def to_datetime(value: t.Any, on_error: t.Literal["raise", "ignore"] = "ignore") -> t.Union[dt.datetime, None]:
if isinstance(value, dt.datetime):
return value
try:
return dateutil.parser.parse(value)
except (TypeError, dateutil.parser.ParserError) as ex:
logger.warning(f"Parsing value into datetime failed: {value}. Reason: {ex}")
if on_error == "ignore":
return None
elif on_error == "raise":
raise


def to_unixtime(value: t.Any, on_error: t.Literal["raise", "ignore"] = "ignore") -> t.Union[float, None]:
if isinstance(value, float):
return value
if isinstance(value, int):
return float(value)
if value is not None and not isinstance(value, dt.datetime):
value = to_datetime(value, on_error=on_error)
if value is None:
if on_error == "ignore":
return None
elif on_error == "raise":
raise ValueError(f"Converting value to unixtime failed: {value}")
return value.timestamp()
Loading

0 comments on commit 98cac09

Please sign in to comment.