From 98cac0978927f2c7083ba0bf9ae54116b8d01b0a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 10 Aug 2024 02:59:28 +0200 Subject: [PATCH] Add Zyp Transformations, a minimal transformation engine A data model and implementation for a compact transformation engine written in Python. - Based on JSON Pointer (RFC 6901), JMESPath, and transon - Implemented using `attrs` and `cattrs` - Includes built-in transformation functions `to_datetime` and `to_unixtime` - Ability to marshal and unmarshal its representation to/from JSON and YAML --- .github/workflows/tests.yml | 2 +- CHANGES.md | 2 + docs/backlog.md | 3 + docs/zyp/backlog.md | 46 +++ docs/zyp/index.md | 182 ++++++++++ docs/zyp/research.md | 24 ++ pyproject.toml | 18 +- src/commons_codec/model.py | 2 +- src/commons_codec/transform/function.py | 35 ++ src/commons_codec/transform/model.py | 379 ++++++++++++++++++++ src/commons_codec/vendor/so/__init__.py | 0 src/commons_codec/vendor/so/dict.py | 165 +++++++++ tests/assets/transformation.json | 35 ++ tests/assets/transformation.yaml | 24 ++ tests/transform/test_aws_dms.py | 5 + tests/transform/test_functions.py | 41 +++ tests/transform/test_recipe.py | 448 ++++++++++++++++++++++++ tests/zyp/__init__.py | 0 tests/zyp/test_model.py | 15 + tests/zyp/test_util.py | 49 +++ 20 files changed, 1471 insertions(+), 4 deletions(-) create mode 100644 docs/zyp/backlog.md create mode 100644 docs/zyp/index.md create mode 100644 docs/zyp/research.md create mode 100644 src/commons_codec/transform/function.py create mode 100644 src/commons_codec/transform/model.py create mode 100644 src/commons_codec/vendor/so/__init__.py create mode 100644 src/commons_codec/vendor/so/dict.py create mode 100644 tests/assets/transformation.json create mode 100644 tests/assets/transformation.yaml create mode 100644 tests/transform/test_functions.py create mode 100644 tests/transform/test_recipe.py create mode 100644 tests/zyp/__init__.py create mode 100644 tests/zyp/test_model.py create mode 100644 tests/zyp/test_util.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index baf59aa..010c4c5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -104,7 +104,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[develop,test,mongodb] + pip install --use-pep517 --prefer-binary --editable=.[all,develop,test] - name: Run linters and software tests run: poe check diff --git a/CHANGES.md b/CHANGES.md index b970185..6de19bf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # Changelog ## Unreleased +- Added `BucketTransformation`, a minimal transformation engine + based on JSON Pointer (RFC 6901). ## 2024/08/05 v0.0.3 - Added transformer for AWS DMS to CrateDB SQL diff --git a/docs/backlog.md b/docs/backlog.md index 4ebf7c0..a82f959 100644 --- a/docs/backlog.md +++ b/docs/backlog.md @@ -12,3 +12,6 @@ - [ ] MongoDB: Implement stream resumption using `start_after` - [ ] Feature: Filter by events, e.g. Ignore "delete" events? - [ ] Integration Testing the "example" programs? +- [ ] Improve capabilities of DMS translator + https://github.com/daq-tools/commons-codec/issues/11 +- https://github.com/supabase/pg_replicate diff --git a/docs/zyp/backlog.md b/docs/zyp/backlog.md new file mode 100644 index 0000000..72264f8 --- /dev/null +++ b/docs/zyp/backlog.md @@ -0,0 +1,46 @@ +# Zyp Backlog + +## Iteration +1 +- Refactor module namespace to `zyp` +- Documentation +- CLI interface +- Apply to MongoDB Table Loader in CrateDB Toolkit + +## Iteration +2 +Demonstrate! +- math expressions +- omit key (recursively) +- combine keys +- filter on keys and/or values +- Pathological cases like "Not defined" in typed fields like `TIMESTAMP` +- Use simpleeval, like Meltano, and provide the same built-in functions +- https://sdk.meltano.com/en/v0.39.1/stream_maps.html#other-built-in-functions-and-names +- https://github.com/MeltanoLabs/meltano-map-transform/pull/255 +- https://github.com/MeltanoLabs/meltano-map-transform/issues/252 +- Use JSONPath, see https://sdk.meltano.com/en/v0.39.1/code_samples.html#use-a-jsonpath-expression-to-extract-the-next-page-url-from-a-hateoas-response + +## Iteration +3 +- Moksha transformations on Buckets +- Investigate using JSON Schema +- Fluent API interface +- https://github.com/Halvani/alphabetic +- Mappers do not support external API lookups. + To add external API lookups, you can either (a) land all your data and + then joins using a transformation tool like dbt, or (b) create a custom + mapper plugin with inline lookup logic. + => Example from Luftdatenpumpe, using a reverse geocoder +- [ ] Define schema + https://sdk.meltano.com/en/latest/typing.html +- https://docs.meltano.com/guide/v2-migration/#migrate-to-an-adapter-specific-dbt-transformer +- https://github.com/meltano/sdk/blob/v0.39.1/singer_sdk/mapper.py + +## Fluent API Interface +```python +from commons_codec.transform.model import FluentTransformation + +transformation = FluentTransformation() \ + .jmes("records[?starts_with(location, 'B')]") \ + .rename_fields({"_id": "id"}) \ + .convert_values({"/id": "int", "/value": "float"}, type="pointer-python") \ + .jq(".[] |= (.value /= 100)") +``` diff --git a/docs/zyp/index.md b/docs/zyp/index.md new file mode 100644 index 0000000..4f4a706 --- /dev/null +++ b/docs/zyp/index.md @@ -0,0 +1,182 @@ +# Zyp Transformations + +## About +A data model and implementation for a compact transformation engine written +in [Python], based on [JSON Pointer] (RFC 6901), [JMESPath], and [transon], +implemented using [attrs] and [cattrs]. + +## Ideas +:Conciseness: + Define a multistep data refinement process with as little code as possible. +:Low Footprint: + Doesn't need any infrastructure or pipeline framework. It's just a little library. +:Interoperability: + Marshal transformation recipe definition to/from text-only representations (JSON, + YAML), in order to encourage implementations in other languages. +:Performance: + Well, it is written in Python. Fragments can be re-written in Rust, when applicable. +:Immediate: + Other ETL frameworks and concepts often need to first land your data in the target + system before applying subsequent transformations. Zyp is working directly within + the data pipeline, before data is inserted into the target system. + +## Synopsis I +A basic transformation example for individual data records. +```python +from commons_codec.transform.model import * + +# Consider a slightly messy collection of records. +data_in = [ + {"_id": "123", "name": "device-foo", "reading": "42.42"}, + {"_id": "456", "name": "device-bar", "reading": -84.01}, +] + +# Define a transformation that renames the `_id` field to `id`, +# casts its value to `int`, and casts the `reading` field to `float`. +transformation = BucketTransformation( + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter() + .add(pointer="/id", transformer="builtins.int") + .add(pointer="/reading", transformer="builtins.float"), +) + +for record in data_in: + print(transformation.apply(record)) +``` +The result is a transformed data collection. +```json +[ + {"id": 123, "name": "device-foo", "reading": 42.42}, + {"id": 456, "name": "device-bar", "reading": -84.01} +] +``` + +## Synopsis II +A more advanced transformation example for a collection of data records. + +Consider a messy collection of input data. +- The actual collection is nested within the top-level `records` item. +- `_id` fields are conveyed in string format. +- `value` fields include both integer and string values. +- `value` fields are fixed-point values, using a scaling factor of `100`. +- The collection includes invalid `null` records. + Those records usually trip processing when, for example, filtering on object items. +```python +data_in = { + "message-source": "system-3000", + "message-type": "eai-warehouse", + "records": [ + {"_id": "12", "meta": {"name": "foo", "location": "B"}, "data": {"value": "4242"}}, + None, + {"_id": "34", "meta": {"name": "bar", "location": "BY"}, "data": {"value": -8401}}, + {"_id": "56", "meta": {"name": "baz", "location": "NI"}, "data": {"value": 2323}}, + {"_id": "78", "meta": {"name": "qux", "location": "NRW"}, "data": {"value": -580}}, + None, + None, + ], +} +``` + +Consider after applying a corresponding transformation, the expected outcome is a +collection of valid records, optionally filtered, and values adjusted according +to relevant type hints and other conversions. +```python +data_out = [ + {"id": 12, "meta": {"name": "foo", "location": "B"}, "data": {"value": 42.42}}, + {"id": 34, "meta": {"name": "bar", "location": "BY"}, "data": {"value": -84.01}}, +] +``` + +Let's come up with relevant pre-processing rules to cleanse and mangle the shape of the +input collection. In order to make this example more exciting, let's include two special +needs: +- Filter input collection by value of nested element. +- Rename top-level fields starting with underscore `_`. + +Other than those special rules, the fundamental ones to re-shape the data are: +- Unwrap `records` attribute from container dictionary into actual collection. +- Filter collection, both by omitting invalid/empty records, and by applying query + constrains. +- On each record, rename the top-level `_id` field to `id`. +- On each record, adjust the data types of the `id` and `value` fields. +- Postprocess collection, applying a custom scaling factor to the `value` field. + +Zyp let's you concisely write those rules down, using the Python language. +```python +from commons_codec.transform.model import * + +transformation = CollectionTransformation( + pre=MokshaTransformation().jmes("records[?not_null(meta.location) && !starts_with(meta.location, 'N')]"), + bucket=BucketTransformation( + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter() + .add(pointer="/id", transformer="builtins.int") + .add(pointer="/data/value", transformer="builtins.float"), + ), + post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"), +) + +data_out = transformation.apply(data_in) +``` +Alternatively, serialize the `zyp-collection` transformation description, +for example into YAML format. +```python +print(transformation.to_yaml()) +``` +```yaml +meta: + version: 1 + type: zyp-collection +pre: + rules: + - expression: records[?not_null(meta.location) && !starts_with(meta.location, 'N')] + type: jmes +bucket: + names: + rules: + - new: id + old: _id + values: + rules: + - args: [] + pointer: /id + transformer: builtins.int + - args: [] + pointer: /data/value + transformer: builtins.float +post: + rules: + - expression: .[] |= (.data.value /= 100) + type: jq +``` + + +## Prior Art +- [Singer Transformer] +- [PipelineWise Transformations] +- [singer-transform] +- [Meltano Inline Data Mapping] +- [Meltano Inline Stream Maps] +- [AWS DMS source filter rules] +- [AWS DMS table selection and transformation rules] +- ... and many more. Thanks for the inspirations. + + +## Etymology +With kudos to [Kris Zyp] for conceiving [JSON Pointer]. + + +[attrs]: https://www.attrs.org/ +[AWS DMS source filter rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.Filters.html +[AWS DMS table selection and transformation rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.html +[cattrs]: https://catt.rs/ +[Kris Zyp]: https://github.com/kriszyp +[JMESPath]: https://jmespath.org/ +[JSON Pointer]: https://datatracker.ietf.org/doc/html/rfc6901 +[Meltano Inline Data Mapping]: https://docs.meltano.com/guide/mappers/ +[Meltano Inline Stream Maps]: https://sdk.meltano.com/en/latest/stream_maps.html +[PipelineWise Transformations]: https://transferwise.github.io/pipelinewise/user_guide/transformations.html +[Python]: https://en.wikipedia.org/wiki/Python_(programming_language) +[Singer Transformer]: https://github.com/singer-io/singer-python/blob/master/singer/transform.py +[singer-transform]: https://github.com/dkarzon/singer-transform +[transon]: https://transon-org.github.io/ diff --git a/docs/zyp/research.md b/docs/zyp/research.md new file mode 100644 index 0000000..4b15136 --- /dev/null +++ b/docs/zyp/research.md @@ -0,0 +1,24 @@ +# Transformer Ingredients Research + +## Toolbox +- jq, jsonpointer, jmespath, funcy, morph, boltons, toolz +- json-spec, jdata, jolt, json-document-transforms, transon + + +## Resources +- https://pypi.org/project/json-spec/ +- https://pypi.org/project/transon/ +- https://pypi.org/project/jdata/ +- https://github.com/microsoft/json-document-transforms +- https://github.com/Microsoft/json-document-transforms/wiki +- https://github.com/bazaarvoice/jolt +- https://stackoverflow.com/questions/76303733/exploring-jolt-functions-for-json-to-json-transformations-an-overview +- https://github.com/microsoft/JsonToJsonMapper +- https://pypi.org/project/jdt/ +- https://github.com/videntity/json-data-tools +- https://github.com/datavis-tech/json-templates +- https://github.com/google/jsonnet +- https://github.com/jsonata-js/jsonata +- https://github.com/pacifica/python-jsonpath2 +- https://github.com/reagento/adaptix +- https://blog.panoply.io/best-data-transformation-tools diff --git a/pyproject.toml b/pyproject.toml index ae613fa..39511f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,9 +22,10 @@ keywords = [ "encode", "i/o", "json", + "jsonpointer", "luftdaten.info", "map data", - "marshall", + "marshal", "mongodb", "nested data", "sensor.community", @@ -37,7 +38,7 @@ keywords = [ "translate", "ttn", "tts", - "unmarshall", + "unmarshal", "unserialize", "utility", ] @@ -102,9 +103,13 @@ dynamic = [ dependencies = [ "attrs<25", "backports-strenum<1.3; python_version<'3.11'", + "cattrs<24", "simplejson<4", "toolz<0.13", ] +optional-dependencies.all = [ + "commons-codec[mongodb,zyp]", +] optional-dependencies.develop = [ "mypy<1.12", "poethepoet<0.28", @@ -124,6 +129,14 @@ optional-dependencies.test = [ "pytest-cov<6", "pytest-mock<4", ] +optional-dependencies.zyp = [ + "jmespath<1.1", + "jq<1.8", + "jsonpointer<4", + "python-dateutil<2.10", + "pyyaml<7", + "transon==0.0.7", +] urls.Changelog = "https://github.com/daq-tools/commons-codec/blob/main/CHANGES.md" urls.Documentation = "https://github.com/daq-tools/commons-codec/tree/main/docs" @@ -229,6 +242,7 @@ source = [ branch = false omit = [ "tests/*", + "src/commons_codec/vendor/so/dict.py", ] [tool.coverage.report] diff --git a/src/commons_codec/model.py b/src/commons_codec/model.py index c86f422..b69f808 100644 --- a/src/commons_codec/model.py +++ b/src/commons_codec/model.py @@ -6,7 +6,7 @@ if sys.version_info >= (3, 11): from enum import StrEnum else: - from backports.strenum import StrEnum + from backports.strenum import StrEnum # pragma: no cover from attrs import define diff --git a/src/commons_codec/transform/function.py b/src/commons_codec/transform/function.py new file mode 100644 index 0000000..bc87923 --- /dev/null +++ b/src/commons_codec/transform/function.py @@ -0,0 +1,35 @@ +import datetime as dt +import logging +import typing as t + +import dateutil.parser + +logger = logging.getLogger(__name__) + + +def to_datetime(value: t.Any, on_error: t.Literal["raise", "ignore"] = "ignore") -> t.Union[dt.datetime, None]: + if isinstance(value, dt.datetime): + return value + try: + return dateutil.parser.parse(value) + except (TypeError, dateutil.parser.ParserError) as ex: + logger.warning(f"Parsing value into datetime failed: {value}. Reason: {ex}") + if on_error == "ignore": + return None + elif on_error == "raise": + raise + + +def to_unixtime(value: t.Any, on_error: t.Literal["raise", "ignore"] = "ignore") -> t.Union[float, None]: + if isinstance(value, float): + return value + if isinstance(value, int): + return float(value) + if value is not None and not isinstance(value, dt.datetime): + value = to_datetime(value, on_error=on_error) + if value is None: + if on_error == "ignore": + return None + elif on_error == "raise": + raise ValueError(f"Converting value to unixtime failed: {value}") + return value.timestamp() diff --git a/src/commons_codec/transform/model.py b/src/commons_codec/transform/model.py new file mode 100644 index 0000000..003683b --- /dev/null +++ b/src/commons_codec/transform/model.py @@ -0,0 +1,379 @@ +import collections +import importlib +import logging +import typing as t +from collections import OrderedDict + +import attr +import jmespath +import jq +import jsonpointer +import transon +from attr import Factory +from attrs import define +from cattrs.preconf.json import make_converter as make_json_converter +from cattrs.preconf.pyyaml import make_converter as make_yaml_converter +from jsonpointer import JsonPointer, JsonPointerException + +from commons_codec.vendor.so.dict import OrderedDictX + +logger = logging.getLogger(__name__) + + +@define +class SchemaDefinitionRule: + name: str + type: str + + +@define +class SchemaDefinition: + rules: t.List[SchemaDefinitionRule] = Factory(list) + map: t.Dict[str, str] = Factory(dict) + + def add(self, pointer: str, type: str) -> "SchemaDefinition": # noqa: A002 + self.rules.append(SchemaDefinitionRule(name=pointer, type=type)) + self.map[pointer] = type + return self + + +@define +class ConverterRuleBase: + def compile(self): + raise NotImplementedError("Please implement this method") + + +@define +class ValueConverterRule(ConverterRuleBase): + pointer: str + transformer: str + args: t.Union[t.List[t.Any], None] = Factory(list) + + def compile(self): + pointer = to_pointer(self.pointer) + if isinstance(self.transformer, str): + if not self.transformer: + raise ValueError("Empty transformer reference") + transformer_function = self._resolve_fun(self.transformer) + else: + transformer_function = self.transformer + if self.args: + transformer_function = transformer_function(*self.args) + return ValueConverterRuntimeRule(pointer=pointer, transformer=transformer_function) + + @staticmethod + def _resolve_fun(symbol: str) -> t.Callable: + if "." not in symbol: + symbol = f"commons_codec.transform.function.{symbol}" + modname, symbol = symbol.rsplit(".", 1) + mod = importlib.import_module(modname) + return getattr(mod, symbol) + + +@define +class ValueConverterRuntimeRule: + pointer: jsonpointer.JsonPointer + transformer: t.Callable + + +@define +class ConverterBase: + rules: t.List[t.Any] = Factory(list) + _runtime_rules: t.List[t.Any] = Factory(list) + + def __attrs_post_init__(self): + if self.rules and not self._runtime_rules: + for rule in self.rules: + self._add_runtime(rule) + + def _add_rule(self, rule): + self.rules.append(rule) + self._add_runtime(rule) + return self + + def _add_runtime(self, rule): + self._runtime_rules.append(rule.compile()) + return self + + +@define +class ValueConverter(ConverterBase): + rules: t.List[ValueConverterRule] = Factory(list) + _runtime_rules: t.List[ValueConverterRuntimeRule] = Factory(list) + + def add(self, pointer: str, transformer: str, args: t.List[t.Any] = None) -> "ValueConverter": + self._add_rule(ValueConverterRule(pointer=pointer, transformer=transformer, args=args)) + return self + + def apply(self, data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + for rule in self._runtime_rules: + data = swapnode(rule.pointer, data, rule.transformer) + return data + + +@define +class FieldRenamerRule: + old: str + new: str + + +@define +class FieldRenamer: + rules: t.List[FieldRenamerRule] = Factory(list) + + def add(self, old: str, new: str) -> "FieldRenamer": + self.rules.append(FieldRenamerRule(old=old, new=new)) + return self + + def apply(self, data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + d = OrderedDictX(data) + for rule in self.rules: + d.rename_key(rule.old, rule.new) + return d + + +TransonTemplate = t.Dict[str, t.Any] + + +@define +class TransonRule: + pointer: str + template: TransonTemplate + + def compile(self): + return TransonRuntimeRule(to_pointer(self.pointer), transformer=transon.Transformer(self.template)) + + +@define +class TransonRuntimeRule: + pointer: JsonPointer + transformer: transon.Transformer + + +def to_pointer(pointer: t.Union[str, JsonPointer]) -> JsonPointer: + if isinstance(pointer, str): + try: + return jsonpointer.JsonPointer(pointer) + except JsonPointerException as ex: + raise ValueError(ex) from ex + elif isinstance(pointer, JsonPointer): + return pointer + else: + raise TypeError(f"Value is not of type str or JsonPointer: {type(pointer).__name__}") + + +@define +class TransonTransformation(ConverterBase): + rules: t.List[TransonRule] = Factory(list) + _runtime_rules: t.List[TransonRuntimeRule] = Factory(list) + + def add(self, pointer: str, template: TransonTemplate) -> "TransonTransformation": + self._add_rule(TransonRule(pointer=pointer, template=template)) + return self + + def apply(self, data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + for rule in self._runtime_rules: + data = swapnode(rule.pointer, data, rule.transformer.transform) + return data + + +def swapnode(pointer: JsonPointer, value: t.Any, fun: t.Callable = None) -> JsonPointer: + node = pointer.get(value) + if fun is not None: + node = fun(node) + inplace = bool(pointer.parts) + return pointer.set(value, node, inplace=inplace) + + +def no_privates_no_nulls(key, value) -> bool: + """ + A filter for `attr.asdict`, to suppress private attributes. + """ + is_private = key.name.startswith("_") + is_null = value is None + if is_private or is_null: + return False + return True + + +@define +class Metadata: + version: t.Union[int, None] = None + type: t.Union[str, None] = None + + +@define +class Dumpable: + meta: t.Union[Metadata, None] = None + + def to_dict(self) -> t.Dict[str, t.Any]: + return attr.asdict(self, dict_factory=OrderedDict, filter=no_privates_no_nulls) + + def to_json(self) -> str: + converter = make_json_converter(dict_factory=OrderedDict) + return converter.dumps(self.to_dict()) + + def to_yaml(self) -> str: + converter = make_yaml_converter(dict_factory=OrderedDict) + return converter.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, data: t.Dict[str, t.Any]): + return cls(**data) + + @classmethod + def from_json(cls, json_str: str): + converter = make_json_converter(dict_factory=OrderedDict) + return converter.loads(json_str, cls) + + @classmethod + def from_yaml(cls, json_str: str): + converter = make_yaml_converter(dict_factory=OrderedDict) + return converter.loads(json_str, cls) + + +@define +class BucketTransformation(Dumpable): + """ + A minimal transformation engine. + + Based on: + - JSON Pointer (RFC 6901) + - Transon + + Documentation: + - https://www.rfc-editor.org/rfc/rfc6901 + - https://transon-org.github.io/ + """ + + meta: Metadata = Metadata(version=1, type="zyp-bucket") + schema: t.Union[SchemaDefinition, None] = None + names: t.Union[FieldRenamer, None] = None + values: t.Union[ValueConverter, None] = None + transon: t.Union[TransonTransformation, None] = None + + def apply(self, data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + if self.names: + data = self.names.apply(data) + if self.values: + data = self.values.apply(data) + if self.transon: + data = self.transon.apply(data) + return data + + +MokshaTransformer = t.Union[jmespath.parser.ParsedResult, jq._Program, transon.Transformer] + + +@define +class MokshaRule: + type: str + expression: t.Union[str, TransonTemplate] + + def compile(self): + return MokshaRuntimeRule(self.type, compile_expression(self.type, self.expression)) + + +def compile_expression(type: str, expression: t.Union[str, TransonTemplate]) -> MokshaTransformer: # noqa: A002 + if type == "jmes": + return jmespath.compile(expression) + elif type == "jq": + return jq.compile(expression) + elif type == "transon": + return transon.Transformer(expression) + else: + raise TypeError(f"Compilation failed. Type must be either jmes or jq or transon: {type}") + + +Record = t.Dict[str, t.Any] +Collection = t.List[Record] +DictOrList = t.Union[Record, Collection] + + +@define +class MokshaRuntimeRule: + type: str + transformer: MokshaTransformer + + def evaluate(self, data: DictOrList) -> DictOrList: + if isinstance(self.transformer, jmespath.parser.ParsedResult): + return self.transformer.search(data, options=jmespath.Options(dict_cls=collections.OrderedDict)) + elif isinstance(self.transformer, jq._Program): + return self.transformer.input_value(data).first() + elif isinstance(self.transformer, transon.Transformer): + return self.transformer.transform(data) + else: + raise TypeError(f"Evaluation failed. Type must be either jmes or jq or transon: {self.transformer}") + + +@define +class MokshaTransformation(ConverterBase): + rules: t.List[MokshaRule] = Factory(list) + _runtime_rules: t.List[MokshaRuntimeRule] = Factory(list) + + def jmes(self, expression: str) -> "MokshaTransformation": + if not expression: + raise ValueError("JMESPath expression cannot be empty") + + self._add_rule(MokshaRule(type="jmes", expression=expression)) + return self + + def jq(self, expression: str) -> "MokshaTransformation": + if not expression: + raise ValueError("jq expression cannot be empty") + + self._add_rule(MokshaRule(type="jq", expression=expression)) + return self + + def transon(self, expression: TransonTemplate) -> "MokshaTransformation": + if not expression: + raise ValueError("transon expression cannot be empty") + + self._add_rule(MokshaRule(type="transon", expression=expression)) + return self + + def apply(self, data: DictOrList) -> DictOrList: + for rule in self._runtime_rules: + data = rule.evaluate(data) + return data + + +@define +class CollectionTransformation(Dumpable): + meta: Metadata = Metadata(version=1, type="zyp-collection") + schema: t.Union[SchemaDefinition, None] = None + pre: t.Union[MokshaTransformation, None] = None + bucket: t.Union[BucketTransformation, None] = None + post: t.Union[MokshaTransformation, None] = None + + def apply(self, data: DictOrList) -> Collection: + collection = t.cast(Collection, data) + if self.pre: + collection = t.cast(Collection, self.pre.apply(collection)) + collection_out: Collection = [] + if self.bucket: + for item in collection: + item = self.bucket.apply(item) + collection_out.append(item) + if self.post: + collection_out = t.cast(Collection, self.post.apply(collection_out)) + return collection_out + + +@define +class FluentTransformation(ConverterBase): + rules = t.List[t.Any] + + def jmes(self, expression) -> "FluentTransformation": + self._add_rule(MokshaRule(type="jmes", expression=expression)) + return self + + def jq(self, expression) -> "FluentTransformation": + self._add_rule(MokshaRule(type="jq", expression=expression)) + return self + + def rename_fields(self, definition: t.Dict[str, t.Any]) -> "FluentTransformation": + return self + + def convert_values(self, definition: t.Dict[str, t.Any], type: str) -> "FluentTransformation": # noqa: A002 + return self diff --git a/src/commons_codec/vendor/so/__init__.py b/src/commons_codec/vendor/so/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/commons_codec/vendor/so/dict.py b/src/commons_codec/vendor/so/dict.py new file mode 100644 index 0000000..0459624 --- /dev/null +++ b/src/commons_codec/vendor/so/dict.py @@ -0,0 +1,165 @@ +""" +OrderedDictX by Zuzu Corneliu. + +For the keeping of order case (the other one is trivial, remove old and add new +one): I was not satisfied with the ordered-dictionary needing reconstruction +(at least partially), obviously for efficiency reasons, so I've put together a +class (OrderedDictX) that extends OrderedDict and allows you to do key changes +efficiently, i.e. in O(1) complexity. The implementation can also be adjusted +for the now-ordered built-in dict class. + +It uses 2 extra dictionaries to remap the changed keys ("external" - i.e. as +they appear externally to the user) to the ones in the underlying OrderedDict +("internal") - the dictionaries will only hold keys that were changed so as +long as no key changing is done they will be empty. + +As expected, the splicing method is extremely slow (didn't expect it to be that +much slower either though) and uses a lot of memory, and the O(N) solution of +@Ashwini Chaudhary (bug-fixed though, del also needed) is also slower, 17X +times in this example. + +Of course, this solution being O(1), compared to the O(N) OrderedDictRaymond +the time difference becomes much more apparent as the dictionary size +increases, e.g. for 5 times more elements (100000), the O(N) is 100X slower. + +https://stackoverflow.com/questions/16475384/rename-a-dictionary-key/75115645#75115645 +""" + +from collections import OrderedDict + + +class OrderedDictX(OrderedDict): + def __init__(self, *args, **kwargs): + # Mappings from new->old (ext2int), old->new (int2ext). + # Only the keys that are changed (internal key doesn't match what the user sees) are contained. + self._keys_ext2int = OrderedDict() + self._keys_int2ext = OrderedDict() + self.update(*args, **kwargs) + + def rename_key(self, k_old, k_new): + # Validate that the old key is part of the dict + if not self.__contains__(k_old): + raise KeyError(f"Cannot rename key {k_old} to {k_new}: {k_old} not existing in dict") + + # Return if no changing is actually to be done + if len(OrderedDict.fromkeys([k_old, k_new])) == 1: + return + + # Validate that the new key would not conflict with another one + if self.__contains__(k_new): + raise KeyError(f"Cannot rename key {k_old} to {k_new}: {k_new} already in dict") + + # Change the key using internal dicts mechanism + if k_old in self._keys_ext2int: + # Revert change temporarily + k_old_int = self._keys_ext2int[k_old] + del self._keys_ext2int[k_old] + k_old = k_old_int + # Check if new key matches the internal key + if len(OrderedDict.fromkeys([k_old, k_new])) == 1: + del self._keys_int2ext[k_old] + return + + # Finalize key change + self._keys_ext2int[k_new] = k_old + self._keys_int2ext[k_old] = k_new + + def __contains__(self, k) -> bool: + if k in self._keys_ext2int: + return True + if not super().__contains__(k): + return False + return k not in self._keys_int2ext + + def __getitem__(self, k): + if not self.__contains__(k): + # Intentionally raise KeyError in ext2int + return self._keys_ext2int[k] + return super().__getitem__(self._keys_ext2int.get(k, k)) + + def __setitem__(self, k, v): + if k in self._keys_ext2int: + return super().__setitem__(self._keys_ext2int[k], v) + # If the key exists in the internal state but was renamed to a k_ext, + # employ this trick: make it such that it appears as if k_ext has also been renamed to k + if k in self._keys_int2ext: + k_ext = self._keys_int2ext[k] + self._keys_ext2int[k] = k_ext + k = k_ext + return super().__setitem__(k, v) + + def __delitem__(self, k): + if not self.__contains__(k): + # Intentionally raise KeyError in ext2int + del self._keys_ext2int[k] + if k in self._keys_ext2int: + k_int = self._keys_ext2int[k] + del self._keys_ext2int[k] + del self._keys_int2ext[k_int] + k = k_int + return super().__delitem__(k) + + def __iter__(self): + yield from self.keys() + + def __reversed__(self): + for k in reversed(super().keys()): + yield self._keys_int2ext.get(k, k) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, dict): + return False + if len(self) != len(other): + return False + for (k, v), (k_other, v_other) in zip(self.items(), other.items()): + if k != k_other or v != v_other: + return False + return True + + def update(self, *args, **kwargs): + for k, v in OrderedDict(*args, **kwargs).items(): + self.__setitem__(k, v) + + def popitem(self, last=True) -> tuple: + if not last: + k = next(iter(self.keys())) + else: + k = next(iter(reversed(self.keys()))) + v = self.__getitem__(k) + self.__delitem__(k) + return k, v + + class OrderedDictXKeysView: + def __init__(self, odx: "OrderedDictX", orig_keys): + self._odx = odx + self._orig_keys = orig_keys + + def __iter__(self): + for k in self._orig_keys: + yield self._odx._keys_int2ext.get(k, k) + + def __reversed__(self): + for k in reversed(self._orig_keys): + yield self._odx._keys_int2ext.get(k, k) + + class OrderedDictXItemsView: + def __init__(self, odx: "OrderedDictX", orig_items): + self._odx = odx + self._orig_items = orig_items + + def __iter__(self): + for k, v in self._orig_items: + yield self._odx._keys_int2ext.get(k, k), v + + def __reversed__(self): + for k, v in reversed(self._orig_items): + yield self._odx._keys_int2ext.get(k, k), v + + def keys(self): + return self.OrderedDictXKeysView(self, super().keys()) + + def items(self): + return self.OrderedDictXItemsView(self, super().items()) + + def copy(self): + return OrderedDictX(self.items()) diff --git a/tests/assets/transformation.json b/tests/assets/transformation.json new file mode 100644 index 0000000..2acac46 --- /dev/null +++ b/tests/assets/transformation.json @@ -0,0 +1,35 @@ +{ + "meta": { + "version": 1, + "type": "zyp-bucket" + }, + "schema": { + "rules": [ + { + "name": "/meta/date", + "type": "DATETIME" + } + ], + "map": { + "/meta/date": "DATETIME" + } + }, + "names": { + "rules": [ + { + "old": "_id", + "new": "id" + } + ] + }, + "values": { + "rules": [ + { + "pointer": "/meta/date", + "transformer": "to_unixtime", + "args": [] + } + ] + }, + "transon": null +} diff --git a/tests/assets/transformation.yaml b/tests/assets/transformation.yaml new file mode 100644 index 0000000..66f83da --- /dev/null +++ b/tests/assets/transformation.yaml @@ -0,0 +1,24 @@ +meta: + version: 1 + type: zyp-collection +pre: + rules: + - expression: records[?not_null(meta.location) && !starts_with(meta.location, 'N')] + type: jmes +bucket: + names: + rules: + - new: id + old: _id + values: + rules: + - args: [] + pointer: /id + transformer: builtins.int + - args: [] + pointer: /data/value + transformer: builtins.float +post: + rules: + - expression: .[] |= (.data.value /= 100) + type: jq diff --git a/tests/transform/test_aws_dms.py b/tests/transform/test_aws_dms.py index 3fce7a9..ddf825e 100644 --- a/tests/transform/test_aws_dms.py +++ b/tests/transform/test_aws_dms.py @@ -1,4 +1,5 @@ # ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction +import base64 import json import pytest @@ -276,3 +277,7 @@ def test_decode_cdc_delete_failure(cdc): with pytest.raises(ValueError) as ex: DMSTranslatorCrateDB().to_sql(MSG_DATA_DELETE) assert ex.match("Unable to invoke DML operation without primary key information") + + +if __name__ == "__main__": + print(base64.b64encode(json.dumps(MSG_DATA_INSERT).encode("utf-8"))) # noqa: T201 diff --git a/tests/transform/test_functions.py b/tests/transform/test_functions.py new file mode 100644 index 0000000..29a9c64 --- /dev/null +++ b/tests/transform/test_functions.py @@ -0,0 +1,41 @@ +import datetime as dt + +import pytest +from commons_codec.transform.function import to_datetime, to_unixtime +from dateutil.parser import ParserError + +stddate = dt.datetime(2023, 6, 30) + + +def test_to_datetime_success(): + assert to_datetime("06/30/2023") == stddate + assert to_datetime("06/05/2023") == dt.datetime(2023, 6, 5) + assert to_datetime(stddate) == stddate + assert to_datetime("---") is None + assert to_datetime(None) is None + + +def test_to_datetime_failure(): + with pytest.raises(ParserError) as ex: + to_datetime("---", on_error="raise") + assert ex.match("String does not contain a date: ---") + + +def test_to_unixtime_success(): + assert to_unixtime("06/30/2023") == 1688076000.0 + assert to_unixtime("06/05/2023") == 1685916000.0 + assert to_unixtime(stddate) == 1688076000.0 + assert to_unixtime("---") is None + assert to_unixtime(123) == 123 + assert to_unixtime(123.45) == 123.45 + assert to_unixtime(None) is None + + +def test_to_unixtime_failure(): + with pytest.raises(ParserError) as ex: + to_unixtime("---", on_error="raise") + assert ex.match("String does not contain a date: ---") + + with pytest.raises(ValueError) as ex: + to_unixtime(None, on_error="raise") + assert ex.match("Converting value to unixtime failed: None") diff --git a/tests/transform/test_recipe.py b/tests/transform/test_recipe.py new file mode 100644 index 0000000..acc9570 --- /dev/null +++ b/tests/transform/test_recipe.py @@ -0,0 +1,448 @@ +import datetime as dt +import json +from copy import deepcopy +from pathlib import Path + +import pytest +import yaml +from commons_codec.transform.model import ( + BucketTransformation, + CollectionTransformation, + FieldRenamer, + MokshaRule, + MokshaTransformation, + SchemaDefinition, + TransonTransformation, + ValueConverter, +) +from jmespath.exceptions import ParseError + + +class ReadingWithTimestamps: + """ + An example dataset including a variety of timestamps. + + null + 06/30/2023 + 07/31/2022 00:00:00 + 2022-07-07 + Invalid date + """ + + ingress = { + "meta": { + "american_date": "06/30/2023", + "american_date_time": "06/14/2022 12:42:24", + "empty_date": "", + "international_date": "2022-07-07", + "invalid_date": "Invalid date", + "none_date": None, + "null_date": "null", + }, + "data": { + "temperature": 42.42, + "humidity": 84.84, + }, + } + egress = { + "meta": { + "american_date": dt.datetime(2023, 6, 30, 0, 0, 0), + "american_date_time": dt.datetime(2022, 6, 14, 12, 42, 24), + "empty_date": None, + "international_date": dt.datetime(2022, 7, 7), + "invalid_date": None, + "none_date": None, + "null_date": None, + }, + "data": { + "temperature": 42.42, + "humidity": 84.84, + }, + } + + +class BasicReading: + ingress = { + "_id": "foobar", + "meta": { + "date": "06/14/2022 12:42:24", + }, + "data": { + "temperature": 42.42, + "humidity": 84.84, + }, + } + egress = { + "id": "foobar", + "meta": { + "date": 1655203344.0, + }, + "data": { + "temperature": 42.42, + "humidity": 84.84, + }, + } + + +def test_value_converter_datetime_function_reference(): + """ + Verify value conversion with function reference to built-in transformer. + """ + engine = ValueConverter() + engine.add(pointer="/meta/american_date", transformer="to_datetime") + engine.add(pointer="/meta/american_date_time", transformer="to_datetime") + engine.add(pointer="/meta/empty_date", transformer="to_datetime") + engine.add(pointer="/meta/international_date", transformer="to_datetime") + engine.add(pointer="/meta/invalid_date", transformer="to_datetime") + engine.add(pointer="/meta/none_date", transformer="to_datetime") + engine.add(pointer="/meta/null_date", transformer="to_datetime") + + indata = deepcopy(ReadingWithTimestamps.ingress) + outdata = engine.apply(indata) + assert outdata == ReadingWithTimestamps.egress + + +def test_value_converter_datetime_function_callback(): + """ + Verify value conversion with function callback. + + Note: This use-case is discouraged, because an inline callback can't + be serialized into a text representation well. + """ + engine = ValueConverter() + from commons_codec.transform.function import to_datetime + + engine.add(pointer="/meta/american_date", transformer=to_datetime) + indata = deepcopy(ReadingWithTimestamps.ingress) + outdata = engine.apply(indata) + assert outdata["meta"]["american_date"] == ReadingWithTimestamps.egress["meta"]["american_date"] + + +def test_value_converter_root_node_yaml_dump(): + """ + Converting values on the root level of the document. + """ + engine = ValueConverter() + engine.add(pointer="", transformer="yaml.dump") + assert engine.apply({"value": 42}) == "value: 42\n" + + +def test_value_converter_root_node_extract_and_convert(): + """ + Converting values on the root level of the document. + """ + engine = ValueConverter() + engine.add(pointer="", transformer="operator.itemgetter", args=["value"]) + engine.add(pointer="", transformer="builtins.str") + assert engine.apply({"value": 42}) == "42" + + +def test_value_converter_path_invalid(): + """ + Converting values with an invalid location pointer fails. + """ + engine = ValueConverter() + with pytest.raises(ValueError) as ex: + engine.add(pointer="---", transformer="to_datetime") + assert ex.match("Location must start with /") + + +def test_value_converter_transformer_empty(): + """ + Converting values with an empty transformer reference fails. + """ + engine = ValueConverter() + with pytest.raises(ValueError) as ex: + engine.add(pointer="/foo", transformer="") + assert ex.match("Empty transformer reference") + + +def test_value_converter_transformer_unknown_module(): + """ + Converting values with an unknown transformer module fails. + """ + engine = ValueConverter() + with pytest.raises(ImportError) as ex: + engine.add(pointer="/foo", transformer="foo.to_unknown") + assert ex.match("No module named 'foo'") + + +def test_value_converter_transformer_unknown_symbol(): + """ + Converting values with an unknown transformer symbol fails. + """ + engine = ValueConverter() + with pytest.raises(AttributeError) as ex: + engine.add(pointer="/foo", transformer="to_unknown") + assert ex.match("module 'commons_codec.transform.function' has no attribute 'to_unknown'") + + +def test_bucket_transformation_success(): + """ + Converting values with a complete transformation description. + """ + transformation = BucketTransformation( + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter().add(pointer="/meta/date", transformer="to_unixtime"), + ) + result = transformation.apply(deepcopy(BasicReading.ingress)) + assert result == BasicReading.egress + + +def test_bucket_transformation_transon_compute(): + """ + Converting documents using a `transon` transformation. + https://transon-org.github.io/ + """ + transformation = BucketTransformation( + transon=TransonTransformation().add( + pointer="/abc", template={"$": "call", "name": "str", "value": {"$": "expr", "op": "mul", "value": 2}} + ), + ) + result = transformation.apply({"abc": 123}) + assert result == {"abc": "246"} + + +def test_bucket_transformation_transon_filter(): + """ + Converting documents using a `transon` transformation. + https://transon-org.github.io/ + """ + transformation = BucketTransformation( + transon=TransonTransformation().add( + pointer="", template={"$": "filter", "cond": {"$": "expr", "op": "!=", "values": [{"$": "key"}, "baz"]}} + ), + ) + result = transformation.apply({"foo": "bar", "baz": "qux", "123": "456"}) + assert result == {"foo": "bar", "123": "456"} + + +def test_bucket_transformation_success_2(): + """ + Running a transformation without any manipulations yields the original input value. + """ + transformation = BucketTransformation() + result = transformation.apply(deepcopy(BasicReading.ingress)) + assert result == BasicReading.ingress + + +def test_bucket_transformation_serialize(): + """ + A transformation description can be serialized to a data structure and back. + """ + transformation = BucketTransformation( + schema=SchemaDefinition().add(pointer="/meta/date", type="DATETIME"), + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter().add(pointer="/meta/date", transformer="to_unixtime"), + ) + transformation_dict = { + "meta": {"version": 1, "type": "zyp-bucket"}, + "schema": {"map": {"/meta/date": "DATETIME"}, "rules": [{"name": "/meta/date", "type": "DATETIME"}]}, + "names": {"rules": [{"new": "id", "old": "_id"}]}, + "values": {"rules": [{"pointer": "/meta/date", "transformer": "to_unixtime"}]}, + } + result = transformation.to_dict() + assert result == transformation_dict + + result = transformation.to_json() + assert json.loads(result) == transformation_dict + + +def test_bucket_transformation_serialize_args(): + """ + Check if transformer args are also serialized. + """ + transformation = BucketTransformation( + values=ValueConverter().add(pointer="", transformer="operator.itemgetter", args=["value"]), + ) + result = transformation.to_dict() + transformation_dict = { + "meta": {"version": 1, "type": "zyp-bucket"}, + "values": {"rules": [{"pointer": "", "transformer": "operator.itemgetter", "args": ["value"]}]}, + } + assert result == transformation_dict + + +def test_bucket_transformation_load_and_apply(): + """ + Verify transformation can be loaded from JSON and applied again. + """ + payload = Path("tests/assets/transformation.json").read_text() + transformation = BucketTransformation.from_json(payload) + result = transformation.apply(deepcopy(BasicReading.ingress)) + assert result == BasicReading.egress + + +def test_moksha_jq_compute_nested(): + """ + Verify updating deeply nested field with value, using moksha/jq. + https://stackoverflow.com/a/65822084 + """ + transformation = MokshaTransformation().jq(".[] |= (.data.abc *= 2)") + assert transformation.apply([{"data": {"abc": 123}}]) == [{"data": {"abc": 246}}] + + +def test_collection_transformation_success(): + """ + Verify transformation recipe for re-shaping a collection of records. + """ + assert ComplexRecipe.transformation.apply(ComplexRecipe.data_in) == ComplexRecipe.data_out + + +class ComplexRecipe: + """ + It executes the following steps, in order of appearance: + + - Unwrap `records` attribute from container dictionary into actual collection. + - Filter collection, both by omitting invalid/empty records, and by applying query constrains. + - On each record, rename the top-level `_id` field to `id`. + - On each record, apply value conversions to two nested data values. + - Postprocess collection, applying a custom value scaling factor. + """ + + # Define a messy input data collection. + data_in = { + "message-source": "system-3000", + "message-type": "eai-warehouse", + "records": [ + {"_id": "12", "meta": {"name": "foo", "location": "B"}, "data": {"value": "4242"}}, + None, + {"_id": "34", "meta": {"name": "bar", "location": "BY"}, "data": {"value": -8401}}, + {"_id": "56", "meta": {"name": "baz", "location": "NI"}, "data": {"value": 2323}}, + {"_id": "78", "meta": {"name": "qux", "location": "NRW"}, "data": {"value": -580}}, + None, + None, + ], + } + + # Define expectation of the cleansed data collection. + data_out = [ + {"id": 12, "meta": {"name": "foo", "location": "B"}, "data": {"value": 42.42}}, + {"id": 34, "meta": {"name": "bar", "location": "BY"}, "data": {"value": -84.01}}, + ] + + # Define transformation. + transformation = CollectionTransformation( + pre=MokshaTransformation().jmes("records[?not_null(meta.location) && !starts_with(meta.location, 'N')]"), + bucket=BucketTransformation( + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter() + .add(pointer="/id", transformer="builtins.int") + .add(pointer="/data/value", transformer="builtins.float"), + ), + post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"), + ) + + +def test_collection_transformation_serialize(): + """ + Verify collection transformation description can be serialized to a data structure and back. + """ + transformation = ComplexRecipe.transformation + transformation_dict = { + "meta": {"version": 1, "type": "zyp-collection"}, + "pre": { + "rules": [ + {"type": "jmes", "expression": "records[?not_null(meta.location) && !starts_with(meta.location, 'N')]"} + ] + }, + "bucket": { + "meta": {"version": 1, "type": "zyp-bucket"}, + "names": {"rules": [{"old": "_id", "new": "id"}]}, + "values": { + "rules": [ + {"pointer": "/id", "transformer": "builtins.int"}, + {"pointer": "/data/value", "transformer": "builtins.float"}, + ] + }, + }, + "post": {"rules": [{"type": "jq", "expression": ".[] |= (.data.value /= 100)"}]}, + } + dict_result = transformation.to_dict() + assert dict_result == transformation_dict + return + + yaml_result = transformation.to_yaml() + assert yaml.full_load(yaml_result) == transformation_dict + CollectionTransformation.from_yaml(yaml_result) + + +def test_collection_transformation_load_and_apply(): + """ + Verify transformation can be loaded from JSON and applied again. + """ + payload = Path("tests/assets/transformation.yaml").read_text() + transformation = CollectionTransformation.from_yaml(payload) + result = transformation.apply(deepcopy(ComplexRecipe.data_in)) + assert result == ComplexRecipe.data_out + + +def test_transon_duplicate_records(): + """ + Verify record duplication works well. + """ + transformation = MokshaTransformation().transon({"$": "expr", "op": "mul", "value": 42}) + assert transformation.apply([{"foo": "bar", "baz": "qux"}]) == [{"foo": "bar", "baz": "qux"}] * 42 + + +def test_bucket_transon_marshal(): + """ + Verify transformation can be loaded from JSON and applied again. + """ + transformation = BucketTransformation( + transon=TransonTransformation().add( + pointer="/abc", template={"$": "call", "name": "str", "value": {"$": "expr", "op": "mul", "value": 2}} + ), + ) + BucketTransformation.from_yaml(transformation.to_yaml()) + + +def test_transon_idempotency(): + """ + Verify record duplication works well. + """ + transformation = MokshaTransformation().transon({"$": "this"}) + assert transformation.apply([{"foo": "bar"}, {"baz": "qux"}]) == [{"foo": "bar"}, {"baz": "qux"}] + + +def test_moksha_rule(): + moksha = MokshaRule(type="jmes", expression="@").compile() + assert moksha.transformer.expression == "@" + assert moksha.transformer.parsed == {"type": "current", "children": []} + + +def test_moksha_runtime_rule_success(): + assert MokshaRule(type="jmes", expression="@").compile().evaluate(42.42) == 42.42 + + +def test_moksha_runtime_rule_syntax_error(): + with pytest.raises(ParseError) as ex: + MokshaRule(type="jmes", expression="@foo").compile() + assert ex.match("Unexpected token: foo") + + +def test_moksha_runtime_rule_invalid_transformer(): + rule = MokshaRule(type="jmes", expression="@").compile() + rule.transformer = "foo" + with pytest.raises(TypeError) as ex: + rule.evaluate(42.42) + assert ex.match("Evaluation failed. Type must be either jmes or jq or transon: foo") + + +def test_moksha_empty(): + with pytest.raises(ValueError) as ex: + MokshaTransformation().jmes("") + assert ex.match("JMESPath expression cannot be empty") + + with pytest.raises(ValueError) as ex: + MokshaTransformation().jq("") + assert ex.match("jq expression cannot be empty") + + with pytest.raises(ValueError) as ex: + MokshaTransformation().transon("") + assert ex.match("transon expression cannot be empty") + + +def test_from_dict(): + assert isinstance(BucketTransformation.from_dict({}), BucketTransformation) + assert isinstance(CollectionTransformation.from_dict({}), CollectionTransformation) diff --git a/tests/zyp/__init__.py b/tests/zyp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/zyp/test_model.py b/tests/zyp/test_model.py new file mode 100644 index 0000000..3a8baa1 --- /dev/null +++ b/tests/zyp/test_model.py @@ -0,0 +1,15 @@ +from commons_codec.transform.model import FluentTransformation + + +def test_fluent_transformation(): + """ + FIXME: Fluent transformations are not implemented yet. + """ + transformation = ( + FluentTransformation() + .jmes("records[?starts_with(location, 'B')]") + .rename_fields({"_id": "id"}) + .convert_values({"/id": "int", "/value": "float"}, type="pointer-python") + .jq(".[] |= (.value /= 100)") + ) + assert len(transformation.rules) == 2 diff --git a/tests/zyp/test_util.py b/tests/zyp/test_util.py new file mode 100644 index 0000000..7e45661 --- /dev/null +++ b/tests/zyp/test_util.py @@ -0,0 +1,49 @@ +import jmespath.parser +import jq +import pytest +import transon +from commons_codec.transform.model import compile_expression, to_pointer +from jsonpointer import JsonPointer + + +def test_to_pointer_string(): + assert to_pointer("/") == JsonPointer("/") + assert to_pointer("") == JsonPointer("") + + +def test_to_pointer_jsonpointer(): + assert to_pointer(JsonPointer("/")) == JsonPointer("/") + + +def test_to_pointer_none(): + with pytest.raises(TypeError) as ex: + to_pointer(None) + assert ex.match("Value is not of type str or JsonPointer: NoneType") + + +def test_to_pointer_int(): + with pytest.raises(TypeError) as ex: + to_pointer(42) + assert ex.match("Value is not of type str or JsonPointer: int") + + +def test_compile_expression_jmes(): + transformer: jmespath.parser.ParsedResult = compile_expression(type="jmes", expression="@") + assert transformer.expression == "@" + assert transformer.parsed == {"type": "current", "children": []} + + +def test_compile_expression_jq(): + transformer: jq._Program = compile_expression(type="jq", expression=".") + assert transformer.program_string == "." + + +def test_compile_expression_transon(): + transformer: transon.Transformer = compile_expression(type="transon", expression={"$": "this"}) + assert transformer.template == {"$": "this"} + + +def test_compile_expression_unknown(): + with pytest.raises(TypeError) as ex: + compile_expression(type="foobar", expression=None) + assert ex.match("Compilation failed. Type must be either jmes or jq or transon: foobar")