Skip to content

Commit

Permalink
MongoDB Full: Refactor transformation subsystem to commons-codec
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 18, 2024
1 parent 79137c7 commit 0738f02
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 138 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- MongoDB Full: Refactor transformation subsystem to `commons-codec`

## 2024/09/16 v0.0.23
- MongoDB: Unlock processing multiple collections, either from server database,
Expand Down
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import pymongo
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from commons_codec.transform.mongodb import MongoDBCDCTranslator

from cratedb_toolkit.util import DatabaseAdapter

Expand All @@ -35,7 +35,7 @@ def __init__(
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)
self.cdc = MongoDBCDCTranslator(table_name=self.table_name)

Check warning on line 38 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L38

Added line #L38 was not covered by tests

def start(self):
"""
Expand Down
49 changes: 3 additions & 46 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@

import sqlalchemy as sa
from boltons.urlutils import URL
from commons_codec.model import SQLOperation
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from pymongo.cursor import Cursor
from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from zyp.model.collection import CollectionAddress

from cratedb_toolkit.io.core import BulkProcessor
from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.export import CrateDBConverter
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
Expand All @@ -23,45 +19,6 @@
logger = logging.getLogger(__name__)


class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB):
"""
Translate a MongoDB document into a CrateDB document.
"""

def __init__(self, table_name: str, converter: CrateDBConverter, tm: TransformationManager = None):
super().__init__(table_name=table_name)
self.converter = converter
self.tm = tm

@staticmethod
def get_document_key(record: t.Dict[str, t.Any]) -> str:
"""
Return value of document key (MongoDB document OID) from CDC record.
"documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")}
"""
return record["_id"]

def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperation:
"""
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if not isinstance(data, Cursor) and not isinstance(data, list):
data = [data]

# Define SQL INSERT statement.
sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);"

# Converge multiple MongoDB documents into SQL parameters for `executemany` operation.
parameters: t.List[DocumentDict] = []
for document in data:
record = self.converter.convert(self.decode_bson(document))
oid: str = self.get_document_key(record)
parameters.append({"oid": oid, "record": record})

return SQLOperation(sql, parameters)


class MongoDBFullLoad:
"""
Copy MongoDB collection into CrateDB table.
Expand Down Expand Up @@ -102,8 +59,8 @@ def __init__(
transformation = tm.project.get(address=address)
except KeyError:
pass
self.converter = CrateDBConverter(transformation=transformation)
self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm)
self.converter = MongoDBCrateDBConverter(transformation=transformation)
self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter)

self.on_error = on_error
self.progress = progress
Expand Down
81 changes: 2 additions & 79 deletions cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,102 +24,25 @@
Export the documents from a MongoDB collection as JSON, to be ingested into CrateDB.
"""

import base64
import calendar
import logging
import typing as t
from uuid import UUID

import bsonjs
import dateutil.parser as dateparser
import orjson as json
import pymongo.collection
from attr import Factory
from attrs import define
from zyp.model.collection import CollectionTransformation
from commons_codec.transform.mongodb import MongoDBCrateDBConverter

from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.io.mongodb.util import sanitize_field_names

logger = logging.getLogger(__name__)


def date_converter(value):
if isinstance(value, int):
return value
dt = dateparser.parse(value)
return calendar.timegm(dt.utctimetuple()) * 1000


def timestamp_converter(value):
if len(str(value)) <= 10:
return value * 1000
return value


type_converter = {
"date": date_converter,
"timestamp": timestamp_converter,
"undefined": lambda x: None,
}


@define
class CrateDBConverter:
transformation: CollectionTransformation = Factory(CollectionTransformation)

def convert(self, data: DocumentDict) -> t.Dict[str, t.Any]:
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
return self.extract_value(data)

def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t.Any:
"""
Decode MongoDB Extended JSON.
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
"""
if isinstance(value, dict):
# Custom adjustments to compensate shape anomalies in source data.
self.apply_special_treatments(value)
if len(value) == 1:
if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]:
decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"])))
return self.extract_value(decoded, parent_type)
for k, v in value.items():
if k.startswith("$"):
return self.extract_value(v, k.lstrip("$"))
return {k.lstrip("$"): self.extract_value(v, parent_type) for (k, v) in value.items()}
if isinstance(value, list):
return [self.extract_value(v, parent_type) for v in value]
if parent_type:
converter = type_converter.get(parent_type)
if converter:
return converter(value)
return value

def apply_special_treatments(self, value: t.Any):
"""
Apply special treatments to value that can't be described otherwise up until now.
# Ignore certain items including anomalies that are not resolved, yet.
TODO: Needs an integration test feeding two records instead of just one.
"""

if self.transformation is None or self.transformation.treatment is None:
return None

return self.transformation.treatment.apply(value)


def convert(d):
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
converter = CrateDBConverter()
converter = MongoDBCrateDBConverter()

Check warning on line 45 in cratedb_toolkit/io/mongodb/export.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/export.py#L45

Added line #L45 was not covered by tests
newdict = {}
for k, v in sanitize_field_names(d).items():
newdict[k] = converter.convert(v)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ kinesis = [
"lorrystream[carabas]>=0.0.6",
]
mongodb = [
"commons-codec[mongodb,zyp]>=0.0.15",
"commons-codec[mongodb,zyp] @ git+https://github.com/crate/commons-codec.git@mongodb-full-next",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
4 changes: 1 addition & 3 deletions tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,7 @@ def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb):
"SELECT pg_typeof(data['publishedDate']) AS type FROM testdrive.demo;", records=True
)
timestamp_type = type_result[0]["type"]

# FIXME: Why does the "canonical format" yield worse results?
assert timestamp_type == "text"
assert timestamp_type == "bigint"


def test_mongodb_copy_filesystem_bson(caplog, cratedb):
Expand Down
14 changes: 7 additions & 7 deletions tests/io/mongodb/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from zyp.model.collection import CollectionTransformation
from zyp.model.treatment import Treatment

from cratedb_toolkit.io.mongodb.export import CrateDBConverter
from cratedb_toolkit.io.mongodb.export import MongoDBCrateDBConverter

pytestmark = pytest.mark.mongodb

Expand Down Expand Up @@ -33,8 +33,8 @@ def test_convert_basic():
],
},
}
converter = CrateDBConverter()
assert converter.convert(data_in) == data_out
converter = MongoDBCrateDBConverter()
assert converter.decode_document(data_in) == data_out


def test_convert_with_treatment_ignore_complex_list():
Expand All @@ -61,8 +61,8 @@ def test_convert_with_treatment_ignore_complex_list():
}

treatment = Treatment(ignore_complex_lists=True)
converter = CrateDBConverter(transformation=CollectionTransformation(treatment=treatment))
assert converter.convert(data_in) == data_out
converter = MongoDBCrateDBConverter(transformation=CollectionTransformation(treatment=treatment))
assert converter.decode_document(data_in) == data_out


def test_convert_with_treatment_all_options():
Expand Down Expand Up @@ -103,5 +103,5 @@ def test_convert_with_treatment_all_options():
{"name": "user", "wrapper_name": "id"},
],
)
converter = CrateDBConverter(transformation=CollectionTransformation(treatment=treatment))
assert converter.convert(data_in) == data_out
converter = MongoDBCrateDBConverter(transformation=CollectionTransformation(treatment=treatment))
assert converter.decode_document(data_in) == data_out

0 comments on commit 0738f02

Please sign in to comment.