Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip - feat(config): Add oauth cb provider #278

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 65 additions & 56 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,24 @@
}

sql_common = (
{
# Required for all SQL sources.
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
*pydantic_no_v2, # because of great-expectations
# scipy version restricted to reduce backtracking, used by great-expectations,
"scipy>=1.7.2",
# GE added handling for higher version of jinja2
# https://github.com/great-expectations/great_expectations/pull/5382/files
# datahub does not depend on traitlets directly but great expectations does.
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
"greenlet",
}
| usage_common
| sqlglot_lib
{
# Required for all SQL sources.
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
*pydantic_no_v2, # because of great-expectations
# scipy version restricted to reduce backtracking, used by great-expectations,
"scipy>=1.7.2",
# GE added handling for higher version of jinja2
# https://github.com/great-expectations/great_expectations/pull/5382/files
# datahub does not depend on traitlets directly but great expectations does.
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
"greenlet",
}
| usage_common
| sqlglot_lib
)

sqllineage_lib = {
Expand All @@ -134,6 +134,14 @@
"botocore!=1.23.0",
}

aws_msk_iam_sasl_signer = {
# AWS MSK IAM SASL Singer
"aws-msk-iam-sasl-signer-python",
# Deal with a version incompatibility between botocore (used by boto3) and urllib3.
# See https://github.com/boto/botocore/pull/2563.
"botocore!=1.23.0",
}

path_spec_common = {
"parse>=1.19.0",
"wcmatch",
Expand Down Expand Up @@ -286,14 +294,14 @@
# sqlalchemy-bigquery is included here since it provides an implementation of
# a SQLalchemy-conform STRUCT type definition
"athena": sql_common
| {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"},
| {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"},
"azure-ad": set(),
"bigquery": sql_common
| bigquery_common
| {
*sqlglot_lib,
"google-cloud-datacatalog-lineage==0.2.2",
},
| bigquery_common
| {
*sqlglot_lib,
"google-cloud-datacatalog-lineage==0.2.2",
},
"clickhouse": sql_common | clickhouse_common,
"clickhouse-usage": sql_common | usage_common | clickhouse_common,
"datahub-lineage-file": set(),
Expand All @@ -317,19 +325,19 @@
"glue": aws_common,
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
"hana": sql_common
| {
"sqlalchemy-hana>=0.5.0; platform_machine != 'aarch64' and platform_machine != 'arm64'",
"hdbcli>=2.11.20; platform_machine != 'aarch64' and platform_machine != 'arm64'",
},
| {
"sqlalchemy-hana>=0.5.0; platform_machine != 'aarch64' and platform_machine != 'arm64'",
"hdbcli>=2.11.20; platform_machine != 'aarch64' and platform_machine != 'arm64'",
},
"hive": sql_common
| pyhive_common
| {
"databricks-dbapi",
# Due to https://github.com/great-expectations/great_expectations/issues/6146,
# we cannot allow 0.15.{23-26}. This was fixed in 0.15.27 by
# https://github.com/great-expectations/great_expectations/pull/6149.
"great-expectations != 0.15.23, != 0.15.24, != 0.15.25, != 0.15.26",
},
| pyhive_common
| {
"databricks-dbapi",
# Due to https://github.com/great-expectations/great_expectations/issues/6146,
# we cannot allow 0.15.{23-26}. This was fixed in 0.15.27 by
# https://github.com/great-expectations/great_expectations/pull/6149.
"great-expectations != 0.15.23, != 0.15.24, != 0.15.25, != 0.15.26",
},
"iceberg": iceberg_common,
"json-schema": set(),
"kafka": kafka_common | kafka_protobuf,
Expand All @@ -342,10 +350,10 @@
"mode": {"requests", "tenacity>=8.0.1"} | sqllineage_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common
| {
"sqlalchemy-pytds>=0.3",
"pyOpenSSL",
},
| {
"sqlalchemy-pytds>=0.3",
"pyOpenSSL",
},
"mssql-odbc": sql_common | {"pyodbc"},
"mysql": mysql,
# mariadb should have same dependency as mysql
Expand All @@ -355,15 +363,15 @@
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"presto": sql_common | pyhive_common | trino,
"presto-on-hive": sql_common
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common
| redshift_common
| usage_common
| sqlglot_lib
| {"cachetools"},
| redshift_common
| usage_common
| sqlglot_lib
| {"cachetools"},
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"sagemaker": aws_common,
Expand All @@ -381,9 +389,9 @@
# to remove that dependency.
"tableau": {"tableauserverclient>=0.17.0"} | sqllineage_lib | sqlglot_lib,
"teradata": sql_common
| usage_common
| sqlglot_lib
| {"teradatasqlalchemy>=17.20.0.0"},
| usage_common
| sqlglot_lib
| {"teradatasqlalchemy>=17.20.0.0"},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
Expand Down Expand Up @@ -731,13 +739,13 @@
| (
plugin_common
if plugin
not in {
"airflow",
"datahub-rest",
"datahub-kafka",
"sync-file-emitter",
"sql-parser",
}
not in {
"airflow",
"datahub-rest",
"datahub-kafka",
"sync-file-emitter",
"sql-parser",
}
else set()
)
| dependencies
Expand All @@ -754,6 +762,7 @@
)
),
"cloud": ["acryl-datahub-cloud"],
"aws-msk-iam-sasl-singer": list(aws_msk_iam_sasl_signer),
"dev": list(dev_requirements),
"testing-utils": list(test_api_requirements), # To import `datahub.testing`
"integration-tests": list(full_test_dev_requirements),
Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/configuration/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from datahub.configuration.common import ConfigModel
from datahub.configuration.validate_host_port import validate_host_port
from datahub.ingestion.api.registry import import_path
from datahub.utilities.oauth_cb_providers.base_oauth_cb_provider import BaseOAuthCbProvider


class _KafkaConnectionConfig(ConfigModel):
Expand Down Expand Up @@ -36,6 +38,10 @@ class KafkaConsumerConnectionConfig(_KafkaConnectionConfig):
description="Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .",
)

if "oauth_cb_provider_class" in consumer_config:
oauth_cb_provider_class: BaseOAuthCbProvider = import_path(consumer_config["oauth_cb_provider_class"])
consumer_config["oauth_cb"] = oauth_cb_provider_class.oauth_cb


class KafkaProducerConnectionConfig(_KafkaConnectionConfig):
"""Configuration class for holding connectivity information for Kafka producers"""
Expand All @@ -44,3 +50,8 @@ class KafkaProducerConnectionConfig(_KafkaConnectionConfig):
default_factory=dict,
description="Extra producer config serialized as JSON. These options will be passed into Kafka's SerializingProducer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#serializingproducer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .",
)

if "oauth_cb_provider_class" in producer_config:
oauth_cb_provider_class: BaseOAuthCbProvider = import_path(producer_config["oauth_cb_provider_class"])
producer_config["oauth_cb"] = oauth_cb_provider_class.oauth_cb

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2021 Acryl Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from base_oauth_cb_provider import BaseOAuthCbProvider


class AwsMskIamSaslSignerCbProvider(BaseOAuthCbProvider):
@staticmethod
def oauth_cb(oauth_config):
aws_region = os.environ.get("AWS_REGION")
aws_debug_creds = os.environ.get("AWS_DEBUG_CREDS").lower() == "true"
auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(aws_region, aws_debug_creds=aws_debug_creds)
# Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token
# generator returns expiry in ms
return auth_token, expiry_ms / 1000
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2021 Acryl Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC


class BaseOAuthCbProvider(ABC):
@staticmethod
def oauth_cb(oauth_config):
pass