Skip to content

Commit

Permalink
Merge branch 'master' into richenc-airflow-plugin-improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
richenc authored Oct 24, 2023
2 parents bc14134 + 378d84a commit 432d510
Show file tree
Hide file tree
Showing 19 changed files with 296 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ export default function PolicyDetailsModal({ policy, visible, onClose, privilege
const isMetadataPolicy = policy?.type === PolicyType.Metadata;

const resources = convertLegacyResourceFilter(policy?.resources);
const resourceTypes = getFieldValues(resources?.filter, 'RESOURCE_TYPE') || [];
const resourceEntities = getFieldValues(resources?.filter, 'RESOURCE_URN') || [];
const resourceTypes = getFieldValues(resources?.filter, 'TYPE') || [];
const resourceEntities = getFieldValues(resources?.filter, 'URN') || [];
const domains = getFieldValues(resources?.filter, 'DOMAIN') || [];

const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ export default function PolicyPrivilegeForm({
} = useAppConfig();

const resources: ResourceFilter = convertLegacyResourceFilter(maybeResources) || EMPTY_POLICY.resources;
const resourceTypes = getFieldValues(resources.filter, 'RESOURCE_TYPE') || [];
const resourceEntities = getFieldValues(resources.filter, 'RESOURCE_URN') || [];
const resourceTypes = getFieldValues(resources.filter, 'TYPE') || [];
const resourceEntities = getFieldValues(resources.filter, 'URN') || [];

const getDisplayName = (entity) => {
if (!entity) {
Expand Down Expand Up @@ -145,10 +145,7 @@ export default function PolicyPrivilegeForm({
};
setResources({
...resources,
filter: setFieldValues(filter, 'RESOURCE_TYPE', [
...resourceTypes,
createCriterionValue(selectedResourceType),
]),
filter: setFieldValues(filter, 'TYPE', [...resourceTypes, createCriterionValue(selectedResourceType)]),
});
};

Expand All @@ -160,7 +157,7 @@ export default function PolicyPrivilegeForm({
...resources,
filter: setFieldValues(
filter,
'RESOURCE_TYPE',
'TYPE',
resourceTypes?.filter((criterionValue) => criterionValue.value !== deselectedResourceType),
),
});
Expand All @@ -173,7 +170,7 @@ export default function PolicyPrivilegeForm({
};
setResources({
...resources,
filter: setFieldValues(filter, 'RESOURCE_URN', [
filter: setFieldValues(filter, 'URN', [
...resourceEntities,
createCriterionValueWithEntity(
resource,
Expand All @@ -192,7 +189,7 @@ export default function PolicyPrivilegeForm({
...resources,
filter: setFieldValues(
filter,
'RESOURCE_URN',
'URN',
resourceEntities?.filter((criterionValue) => criterionValue.value !== resource),
),
});
Expand Down
4 changes: 2 additions & 2 deletions datahub-web-react/src/app/permissions/policy/policyUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ export const convertLegacyResourceFilter = (resourceFilter: Maybe<ResourceFilter
}
const criteria = new Array<PolicyMatchCriterion>();
if (resourceFilter.type) {
criteria.push(createCriterion('RESOURCE_TYPE', [createCriterionValue(resourceFilter.type)]));
criteria.push(createCriterion('TYPE', [createCriterionValue(resourceFilter.type)]));
}
if (resourceFilter.resources && resourceFilter.resources.length > 0) {
criteria.push(createCriterion('RESOURCE_URN', resourceFilter.resources.map(createCriterionValue)));
criteria.push(createCriterion('URN', resourceFilter.resources.map(createCriterionValue)));
}
return {
filter: {
Expand Down
8 changes: 4 additions & 4 deletions docs/authorization/policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ We currently support the following:
#### Resources

Resource filter defines the set of resources that the policy applies to is defined using a list of criteria. Each
criterion defines a field type (like resource_type, resource_urn, domain), a list of field values to compare, and a
criterion defines a field type (like type, urn, domain), a list of field values to compare, and a
condition (like EQUALS). It essentially checks whether the field of a certain resource matches any of the input values.
Note, that if there are no criteria or resource is not set, policy is applied to ALL resources.

Expand All @@ -149,7 +149,7 @@ For example, the following resource filter will apply the policy to datasets, ch
"filter": {
"criteria": [
{
"field": "RESOURCE_TYPE",
"field": "TYPE",
"condition": "EQUALS",
"values": [
"dataset",
Expand All @@ -175,8 +175,8 @@ Supported fields are as follows

| Field Type | Description | Example |
|---------------|------------------------|-------------------------|
| resource_type | Type of the resource | dataset, chart, dataJob |
| resource_urn | Urn of the resource | urn:li:dataset:... |
| type | Type of the resource | dataset, chart, dataJob |
| urn | Urn of the resource | urn:li:dataset:... |
| domain | Domain of the resource | urn:li:domain:domainX |

## Managing Policies
Expand Down
14 changes: 11 additions & 3 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@
# Source plugins
# sqlalchemy-bigquery is included here since it provides an implementation of
# a SQLalchemy-conform STRUCT type definition
"athena": sql_common | {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"},
"athena": sql_common
| {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"},
"azure-ad": set(),
"bigquery": sql_common
| bigquery_common
Expand Down Expand Up @@ -354,7 +355,11 @@
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common | redshift_common | usage_common | sqlglot_lib | {"redshift-connector"},
"redshift": sql_common
| redshift_common
| usage_common
| sqlglot_lib
| {"redshift-connector"},
"redshift-legacy": sql_common | redshift_common,
"redshift-usage-legacy": sql_common | usage_common | redshift_common,
"s3": {*s3_base, *data_lake_profiling},
Expand Down Expand Up @@ -435,7 +440,9 @@
deepdiff_dep = "deepdiff"
test_api_requirements = {pytest_dep, deepdiff_dep, "PyYAML"}

debug_requirements = {"memray"}
debug_requirements = {
"memray",
}

base_dev_requirements = {
*base_requirements,
Expand Down Expand Up @@ -668,6 +675,7 @@
"Documentation": "https://datahubproject.io/docs/",
"Source": "https://github.com/datahub-project/datahub",
"Changelog": "https://github.com/datahub-project/datahub/releases",
"Releases": "https://github.com/acryldata/datahub/releases",
},
license="Apache License 2.0",
description="A CLI to work with DataHub metadata",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ class BigQueryV2Config(
)

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `dataset_pattern` is matched against fully qualified dataset name `<project_id>.<dataset_name>`.",
default=True,
description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name `<project_id>.<dataset_name>`.",
)

include_external_url: bool = Field(
Expand Down Expand Up @@ -327,8 +327,7 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:
):
logger.warning(
"Please update `dataset_pattern` to match against fully qualified schema name `<project_id>.<dataset_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
"The config option `match_fully_qualified_names` is deprecated and will be removed in a future release."
)
return values

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def _get_parsed_audit_log_events(self, project_id: str) -> Iterable[QueryEvent]:
# handle the case where the read happens within our time range but the query
# completion event is delayed and happens after the configured end time.
corrected_start_time = self.start_time - self.config.max_query_duration
corrected_end_time = self.end_time + -self.config.max_query_duration
corrected_end_time = self.end_time + self.config.max_query_duration
self.report.log_entry_start_time = corrected_start_time
self.report.log_entry_end_time = corrected_end_time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,12 @@ def get_time_window(self) -> Tuple[datetime, datetime]:
def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool:
return (
table_ref is not None
and self.config.dataset_pattern.allowed(table_ref.table_identifier.dataset)
and self.config.table_pattern.allowed(table_ref.table_identifier.table)
and self.config.dataset_pattern.allowed(
f"{table_ref.table_identifier.project_id}.{table_ref.table_identifier.dataset}"
if self.config.match_fully_qualified_names
else table_ref.table_identifier.dataset
)
and self.config.table_pattern.allowed(str(table_ref.table_identifier))
)

def _should_ingest_usage(self) -> bool:
Expand Down Expand Up @@ -844,7 +848,7 @@ def _get_parsed_bigquery_log_events(
# handle the case where the read happens within our time range but the query
# completion event is delayed and happens after the configured end time.
corrected_start_time = self.start_time - self.config.max_query_duration
corrected_end_time = self.end_time + -self.config.max_query_duration
corrected_end_time = self.end_time + self.config.max_query_duration
self.report.audit_start_time = corrected_start_time
self.report.audit_end_time = corrected_end_time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
register_custom_type,
)
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri
from datahub.ingestion.source.sql.sql_types import MapType
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
gen_database_container,
Expand All @@ -41,6 +40,7 @@
from datahub.metadata.schema_classes import RecordTypeClass
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column
from datahub.utilities.sqlalchemy_type_converter import (
MapType,
get_schema_fields_for_sqlalchemy_column,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
DatasetSubTypes,
)
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.sql_types import MapType
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
downgrade_schema_from_v2,
Expand Down Expand Up @@ -91,6 +90,7 @@
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport
from datahub.utilities.sqlalchemy_type_converter import MapType

if TYPE_CHECKING:
from datahub.ingestion.source.ge_data_profiler import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import re
from typing import Any, Dict, ValuesView

from sqlalchemy import types

from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayType,
BooleanType,
Expand All @@ -17,6 +15,7 @@
TimeType,
UnionType,
)
from datahub.utilities.sqlalchemy_type_converter import MapType

# these can be obtained by running `select format_type(oid, null),* from pg_type;`
# we've omitted the types without a meaningful DataHub type (e.g. postgres-specific types, index vectors, etc.)
Expand Down Expand Up @@ -369,12 +368,6 @@ def resolve_vertica_modified_type(type_string: str) -> Any:
"array": ArrayType,
}


class MapType(types.TupleType):
# Wrapper class around SQLalchemy's TupleType to increase compatibility with DataHub
pass


# https://docs.aws.amazon.com/athena/latest/ug/data-types.html
# https://github.com/dbt-athena/dbt-athena/tree/main
ATHENA_SQL_TYPES_MAP: Dict[str, Any] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
from sqlalchemy_bigquery import STRUCT

from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.ingestion.source.sql.sql_types import MapType
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.metadata.schema_classes import NullTypeClass, SchemaFieldDataTypeClass

logger = logging.getLogger(__name__)


class MapType(types.TupleType):
# Wrapper class around SQLalchemy's TupleType to increase compatibility with DataHub
pass


class SqlAlchemyColumnToAvroConverter:
"""Helper class that collects some methods to convert SQLalchemy columns to Avro schema."""

Expand Down
68 changes: 59 additions & 9 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,47 @@ def _extract_select_from_create(
return statement


_UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT: Set[str] = set(
sqlglot.exp.Update.arg_types.keys()
) - set(sqlglot.exp.Select.arg_types.keys())


def _extract_select_from_update(
statement: sqlglot.exp.Update,
) -> sqlglot.exp.Select:
statement = statement.copy()

# The "SET" expressions need to be converted.
# For the update command, it'll be a list of EQ expressions, but the select
# should contain aliased columns.
new_expressions = []
for expr in statement.expressions:
if isinstance(expr, sqlglot.exp.EQ) and isinstance(
expr.left, sqlglot.exp.Column
):
new_expressions.append(
sqlglot.exp.Alias(
this=expr.right,
alias=expr.left.this,
)
)
else:
# If we don't know how to convert it, just leave it as-is. If this causes issues,
# they'll get caught later.
new_expressions.append(expr)

return sqlglot.exp.Select(
**{
**{
k: v
for k, v in statement.args.items()
if k not in _UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT
},
"expressions": new_expressions,
}
)


def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool:
return isinstance(statement, sqlglot.exp.Create) and isinstance(
statement.this, sqlglot.exp.Schema
Expand All @@ -767,6 +808,9 @@ def _try_extract_select(
elif isinstance(statement, sqlglot.exp.Insert):
# TODO Need to map column renames in the expressions part of the statement.
statement = statement.expression
elif isinstance(statement, sqlglot.exp.Update):
# Assumption: the output table is already captured in the modified tables list.
statement = _extract_select_from_update(statement)
elif isinstance(statement, sqlglot.exp.Create):
# TODO May need to map column renames.
# Assumption: the output table is already captured in the modified tables list.
Expand Down Expand Up @@ -942,19 +986,25 @@ def _sqlglot_lineage_inner(
)

# Simplify the input statement for column-level lineage generation.
select_statement = _try_extract_select(statement)
try:
select_statement = _try_extract_select(statement)
except Exception as e:
logger.debug(f"Failed to extract select from statement: {e}", exc_info=True)
debug_info.column_error = e
select_statement = None

# Generate column-level lineage.
column_lineage: Optional[List[_ColumnLineageInfo]] = None
try:
column_lineage = _column_level_lineage(
select_statement,
dialect=dialect,
input_tables=table_name_schema_mapping,
output_table=downstream_table,
default_db=default_db,
default_schema=default_schema,
)
if select_statement is not None:
column_lineage = _column_level_lineage(
select_statement,
dialect=dialect,
input_tables=table_name_schema_mapping,
output_table=downstream_table,
default_db=default_db,
default_schema=default_schema,
)
except UnsupportedStatementTypeError as e:
# Inject details about the outer statement type too.
e.args = (f"{e.args[0]} (outer statement type: {type(statement)})",)
Expand Down
Loading

0 comments on commit 432d510

Please sign in to comment.