Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB: Code refactoring and generalization #49

Merged
merged 10 commits into from
Sep 18, 2024
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
pip install "setuptools>=64" --upgrade

# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[mongodb,develop,test]
pip install --use-pep517 --prefer-binary --editable=.[develop,test]

- name: Run linters and software tests
run: poe test -- -m 'dynamodb'
Expand Down Expand Up @@ -161,7 +161,7 @@ jobs:
pip install "setuptools>=64" --upgrade

# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[mongodb,develop,test]
pip install --use-pep517 --prefer-binary --editable=.[mongodb,zyp,develop,test]

- name: Run linters and software tests
run: poe test -- -m 'mongodb'
Expand Down
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Changelog

## Unreleased
- MongoDB: Added `MongoDBFullLoadTranslator` and `MongoDBCrateDBConverter`
- Zyp: Fixed execution of collection transformation
- Zyp: Added software test and documentation about flattening lists
- MongoDB: Use `bson` package to parse BSON CANONICAL representation
- MongoDB: Complete and verify BSON data type mapping end-to-end
- MongoDB: Use improved decoding machinery also for `MongoDBCDCTranslator`
- Dependencies: Make MongoDB subsystem not strictly depend on Zyp

## 2024/09/10 v0.0.15
- Added Zyp Treatments, a slightly tailored transformation subsystem
Expand Down
50 changes: 47 additions & 3 deletions doc/zyp/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ implemented using [attrs] and [cattrs].
system before applying subsequent transformations. Zyp is working directly within
the data pipeline, before data is inserted into the target system.

## Synopsis I
## Example I
A basic transformation example for individual data records.

```python
Expand Down Expand Up @@ -52,7 +52,7 @@ The result is a transformed data collection.
]
```

## Synopsis II
## Example II
A more advanced transformation example for a collection of data records.

Consider a messy collection of input data.
Expand Down Expand Up @@ -120,7 +120,7 @@ transformation = CollectionTransformation(
post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"),
)

data_out = transformation.apply(data_in)
assert transformation.apply(data_in) == data_out
```
Alternatively, serialize the `zyp-collection` transformation description,
for example into YAML format.
Expand Down Expand Up @@ -152,6 +152,50 @@ post:
type: jq
```

## Example III
A compact transformation example that uses `jq` to:

- Unwrap the actual collection which is nested within the top-level `records` item.
- Flatten the item `nested-list` which contains nested lists.

```python
from zyp.model.collection import CollectionTransformation
from zyp.model.moksha import MokshaTransformation

data_in = {
"message-source": "system-3000",
"message-type": "eai-warehouse",
"records": [
{"nested-list": [{"foo": 1}, [{"foo": 2}, {"foo": 3}]]},
],
}

data_out = [
{"nested-list": [{"foo": 1}, {"foo": 2}, {"foo": 3}]},
]

transformation = CollectionTransformation(
pre=MokshaTransformation()
.jq(".records")
.jq('.[] |= (."nested-list" |= flatten)'),
)

assert transformation.apply(data_in) == data_out
```

The same transformation represented in YAML format looks like this.
```yaml
meta:
type: zyp-collection
version: 1
pre:
rules:
- expression: .records
type: jq
- expression: .[] |= (.data."nested-list" |= flatten)
type: jq
```


## Prior Art
- [Singer Transformer]
Expand Down
11 changes: 5 additions & 6 deletions examples/mongodb_cdc_cratedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import sqlalchemy as sa
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from commons_codec.transform.mongodb import MongoDBCDCTranslator


class MiniRelay:
Expand All @@ -35,18 +35,17 @@ def __init__(
self.mongodb_client = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)
self.cdc = MongoDBCDCTranslator(table_name=self.table_name)

def start(self):
"""
Subscribe to change stream events, convert to SQL, and submit to CrateDB.
"""
with self.cratedb_client.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))
connection.execute(sa.text(f"REFRESH TABLE {self.table_name};"))
for operation in self.cdc_to_sql():
connection.execute(sa.text(operation.statement), parameters=operation.parameters)
connection.execute(sa.text(f"REFRESH TABLE {self.table_name};"))

def cdc_to_sql(self):
"""
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ dependencies = [
"attrs<25",
"backports-strenum<1.3; python_version<'3.11'",
"cattrs<25",
"python-dateutil<3",
"simplejson<4",
"sqlalchemy-cratedb>=0.39.0",
"toolz<0.13",
Expand Down
200 changes: 171 additions & 29 deletions src/commons_codec/transform/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,125 @@
# Copyright (c) 2023-2024, The Kotori Developers and contributors.
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the LGPLv3 license, see LICENSE.

# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction

# ruff: noqa: S608
import calendar
import datetime as dt
import logging
import typing as t
from typing import Iterable

from bson.json_util import _json_convert
import bson
import dateutil.parser as dateparser
from attrs import define
from bson.json_util import _json_convert, object_hook
from pymongo.cursor import Cursor
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.model import SQLOperation

Document = t.Mapping[str, t.Any]
DocumentCollection = t.List[Document]


logger = logging.getLogger(__name__)


class MongoDBCDCTranslatorBase:
def date_converter(value):
if isinstance(value, int):
return value
elif isinstance(value, (str, bytes)):
datetime = dateparser.parse(value)
elif isinstance(value, dt.datetime):
datetime = value
else:
raise ValueError(f"Unable to convert datetime value: {value}")
return calendar.timegm(datetime.utctimetuple()) * 1000


@define
class MongoDBCrateDBConverter:
"""
Convert MongoDB Extended JSON to representation consumable by CrateDB.

Extracted from cratedb-toolkit, earlier migr8.
"""

transformation: t.Any = None

def decode_documents(self, data: t.Iterable[Document]) -> Iterable[Document]:
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
data = map(self.decode_value, data)
# TODO: This is currently untyped. Types are defined in Zyp, see `zyp.model.base`.
if self.transformation is not None:
data = self.transformation.apply(data)
return data

def decode_document(self, data: Document) -> Document:
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
return self.decode_value(data)

def decode_value(self, value: t.Any) -> t.Any:
"""
Decode MongoDB Extended JSON.

- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
"""
if isinstance(value, dict):
# Decode item in BSON CANONICAL format.
if len(value) == 1:
return self.decode_canonical(value)

# Custom adjustments to compensate shape anomalies in source data.
self.apply_special_treatments(value)

return {k: self.decode_value(v) for (k, v) in value.items()}
elif isinstance(value, list):
return [self.decode_value(v) for v in value]

return value

@staticmethod
def decode_canonical(value: t.Dict[str, t.Any]) -> t.Any:
"""
Decode BSON CANONICAL representation.
"""
type_ = list(value.keys())[0]
# Special handling for datetime representation in NUMBERLONG format (emulated depth-first).
is_date_numberlong = type_ == "$date" and "$numberLong" in value["$date"]
if is_date_numberlong:
return int(object_hook(value["$date"]))
else:
value = object_hook(value)
is_bson = type(value).__module__.startswith("bson")
if isinstance(value, bson.Binary) and value.subtype == bson.UUID_SUBTYPE:
value = value.as_uuid()
if isinstance(value, bson.Timestamp):
value = value.as_datetime()
if isinstance(value, dt.datetime):
return date_converter(value)
if is_bson:
return str(value)
return value

def apply_special_treatments(self, value: t.Any):
"""
Apply special treatments to value that can't be described otherwise up until now.
# Ignore certain items including anomalies that are not resolved, yet.

TODO: Needs an integration test feeding two records instead of just one.
"""

if self.transformation is None or self.transformation.treatment is None:
return None

return self.transformation.treatment.apply(value)


class MongoDBTranslatorBase:
"""
Translate MongoDB CDC events into different representations.

Expand All @@ -30,7 +135,27 @@ class MongoDBCDCTranslatorBase:
- https://www.mongodb.com/developer/languages/python/python-change-streams/
"""

def decode_bson(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
# Define name of the column where MongoDB's OID for a document will be stored.
ID_COLUMN = "oid"

# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"

def __init__(self, table_name: str, converter: t.Union[MongoDBCrateDBConverter, None] = None):
self.table_name = quote_relation_name(table_name)
self.converter = converter or MongoDBCrateDBConverter()

@property
def sql_ddl(self):
"""
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return (
f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));"
)

@staticmethod
def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]:
"""
Convert MongoDB Extended JSON to vanilla Python dictionary.

Expand Down Expand Up @@ -62,7 +187,41 @@ def decode_bson(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
return _json_convert(item)


class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase):
class MongoDBFullLoadTranslator(MongoDBTranslatorBase):
"""
Translate a MongoDB document into a CrateDB document.
"""

@staticmethod
def get_document_key(record: t.Mapping[str, t.Any]) -> str:
"""
Return value of document key (MongoDB document OID).

"documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")}
"""
return record["_id"]

def to_sql(self, data: t.Union[Document, t.List[Document]]) -> SQLOperation:
"""
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if not isinstance(data, Cursor) and not isinstance(data, list):
data = [data]

# Define SQL INSERT statement.
sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);"

# Converge multiple MongoDB documents into SQL parameters for `executemany` operation.
parameters: t.List[Document] = []
for document in data:
record = self.converter.decode_document(self.decode_bson(document))
oid: str = self.get_document_key(record)
parameters.append({"oid": oid, "record": record})

return SQLOperation(sql, parameters)


class MongoDBCDCTranslator(MongoDBTranslatorBase):
"""
Translate MongoDB CDC events into CrateDB SQL statements that materialize them again.

Expand Down Expand Up @@ -94,25 +253,6 @@ class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase):
CREATE TABLE <tablename> (oid TEXT, data OBJECT(DYNAMIC));
"""

# Define name of the column where MongoDB's OID for a document will be stored.
ID_COLUMN = "oid"

# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"

def __init__(self, table_name: str):
super().__init__()
self.table_name = quote_relation_name(table_name)

@property
def sql_ddl(self):
"""
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return (
f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));"
)

def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]:
"""
Produce INSERT|UPDATE|DELETE SQL statement from insert|update|replace|delete CDC event record.
Expand All @@ -125,15 +265,17 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]:

if operation_type == "insert":
oid: str = self.get_document_key(event)
record = self.decode_bson(self.get_full_document(event))
document = self.get_full_document(event)
record = self.converter.decode_document(self.decode_bson(document))
sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);"
parameters = {"oid": oid, "record": record}

# In order to use "full document" representations from "update" events,
# you need to use `watch(full_document="updateLookup")`.
# https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations
elif operation_type in ["update", "replace"]:
record = self.decode_bson(self.get_full_document(event))
document = self.get_full_document(event)
record = self.converter.decode_document(self.decode_bson(document))
where_clause = self.where_clause(event)
sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = :record " f"WHERE {where_clause};"
parameters = {"record": record}
Expand Down
Loading