Skip to content

Commit

Permalink
feat(ingest): adding maxSchemaSize to mongodb source (#3153)
Browse files Browse the repository at this point in the history
  • Loading branch information
swaroopjagadish authored Aug 24, 2021
1 parent 370a885 commit 88afec7
Show file tree
Hide file tree
Showing 5 changed files with 5,030 additions and 4 deletions.
4 changes: 4 additions & 0 deletions metadata-ingestion/source_docs/mongodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Moreover, setting `useRandomSampling: False` will sample the first documents fou

Note that `schemaSamplingSize` has no effect if `enableSchemaInference: False` is set.

Really large schemas will be further truncated to a maximum of 300 schema fields. This is configurable using the `maxSchemaSize` parameter.

## Quickstart recipe

Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
Expand All @@ -39,6 +41,7 @@ source:
# Options
enableSchemaInference: True
useRandomSampling: True
maxSchemaSize: 300

sink:
# sink configs
Expand All @@ -57,6 +60,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `options` | | | Additional options to pass to `pymongo.MongoClient()`. |
| `enableSchemaInference` | | `True` | Whether to infer schemas. |
| `schemaSamplingSize` | | `1000` | Number of documents to use when inferring schema size. If set to `0`, all documents will be scanned. |
| `maxSchemaSize` | | `300` | Maximum number of fields to include in the schema. |
| `useRandomSampling` | | `True` | If documents for schema inference should be randomly selected. If `False`, documents will be selected from start. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `database_pattern.allow` | | | List of regex patterns for databases to include in ingestion. |
Expand Down
41 changes: 37 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from collections import Counter
from dataclasses import dataclass, field
from typing import Any
from typing import Counter as CounterType
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView

import bson
import pymongo
Expand Down Expand Up @@ -34,6 +35,8 @@
)
from datahub.metadata.schema_classes import DatasetPropertiesClass

logger = logging.getLogger(__name__)

# These are MongoDB-internal databases, which we want to skip.
# See https://docs.mongodb.com/manual/reference/local-database/ and
# https://docs.mongodb.com/manual/reference/config-database/ and
Expand All @@ -52,6 +55,7 @@ class MongoDBConfig(ConfigModel):
enableSchemaInference: bool = True
schemaSamplingSize: Optional[PositiveInt] = 1000
useRandomSampling: bool = True
maxSchemaSize: Optional[PositiveInt] = 300
env: str = DEFAULT_ENV

database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
Expand Down Expand Up @@ -422,8 +426,10 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.report.report_dropped(dataset_name)
continue

dataset_urn = f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{self.config.env})"

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{self.config.env})",
urn=dataset_urn,
aspects=[],
)

Expand All @@ -444,10 +450,37 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

# initialize the schema for the collection
canonical_schema: List[SchemaField] = []

max_schema_size = self.config.maxSchemaSize
collection_schema_size = len(collection_schema.values())
collection_fields: Union[
List[SchemaDescription], ValuesView[SchemaDescription]
] = collection_schema.values()
assert max_schema_size is not None
if collection_schema_size > max_schema_size:
# downsample the schema, using frequency as the sort key
self.report.report_warning(
key=dataset_urn,
reason=f"Downsampling the collection schema because it has {collection_schema_size} fields. Threshold is {max_schema_size}",
)
collection_fields = sorted(
collection_schema.values(),
key=lambda x: x["count"],
reverse=True,
)[0:max_schema_size]
# Add this information to the custom properties so user can know they are looking at downsampled schema
dataset_properties.customProperties[
"schema.downsampled"
] = "True"
dataset_properties.customProperties[
"schema.totalFields"
] = f"{collection_schema_size}"

logger.debug(
f"Size of collection fields = {len(collection_fields)}"
)
# append each schema field (sort so output is consistent)
for schema_field in sorted(
collection_schema.values(), key=lambda x: x["delimited_name"]
collection_fields, key=lambda x: x["delimited_name"]
):
field = SchemaField(
fieldPath=schema_field["delimited_name"],
Expand Down
Loading

0 comments on commit 88afec7

Please sign in to comment.