Skip to content

Commit

Permalink
refactor(ingestion/mongodb): Add platform_instance to mongodb (#8663)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
nicholas-fwang and hsheth2 authored Oct 26, 2023
1 parent a96a512 commit 8522679
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
16 changes: 13 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from pymongo.mongo_client import MongoClient

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import EnvConfigMixin
from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -55,7 +59,7 @@
DENY_DATABASE_LIST = set(["admin", "config", "local"])


class MongoDBConfig(EnvConfigMixin):
class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
# See the MongoDB authentication docs for details and examples.
# https://pymongo.readthedocs.io/en/stable/examples/authentication.html
connect_uri: str = Field(
Expand Down Expand Up @@ -199,6 +203,7 @@ def construct_schema_pymongo(
@platform_name("MongoDB")
@config_class(MongoDBConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.SCHEMA_METADATA, "Enabled by default")
@dataclass
class MongoDBSource(Source):
Expand Down Expand Up @@ -320,7 +325,12 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.report_dropped(dataset_name)
continue

dataset_urn = f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{self.config.env})"
dataset_urn = make_dataset_urn_with_platform_instance(
platform=platform,
name=dataset_name,
env=self.config.env,
platform_instance=self.config.platform_instance,
)

dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.emptyCollection,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.emptyCollection,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
Expand Down Expand Up @@ -41,7 +41,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.firstCollection,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.firstCollection,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
Expand Down Expand Up @@ -345,7 +345,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.largeCollection,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.largeCollection,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
Expand Down Expand Up @@ -3988,7 +3988,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.secondCollection,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.secondCollection,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
Expand Down Expand Up @@ -4135,7 +4135,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.emptyCollection,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.emptyCollection,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
Expand All @@ -4150,7 +4150,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.firstCollection,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.firstCollection,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
Expand All @@ -4165,7 +4165,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.largeCollection,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.largeCollection,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
Expand All @@ -4180,7 +4180,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,mngdb.secondCollection,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,instance.mngdb.secondCollection,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_mongodb_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time
"username": "mongoadmin",
"password": "examplepass",
"maxDocumentSize": 25000,
"platform_instance": "instance",
},
},
"sink": {
Expand Down

0 comments on commit 8522679

Please sign in to comment.