diff --git a/README.md b/README.md index b8dce8609e303..96b30ea49359c 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,7 @@ Here are the companies that have officially adopted DataHub. Please feel free to - [LinkedIn](http://linkedin.com) - [Peloton](https://www.onepeloton.com) - [Saxo Bank](https://www.home.saxo) +- [Stash](https://www.stash.com) - [Shanghai HuaRui Bank](https://www.shrbank.com) - [ThoughtWorks](https://www.thoughtworks.com) - [TypeForm](http://typeform.com) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/tag/TagType.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/tag/TagType.java index e58b670a24421..177e03a5c9659 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/tag/TagType.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/tag/TagType.java @@ -154,7 +154,7 @@ private boolean isAuthorized(@Nonnull TagUpdateInput update, @Nonnull QueryConte return AuthorizationUtils.isAuthorized( context.getAuthorizer(), context.getAuthentication().getActor().toUrnStr(), - PoliciesConfig.DATASET_PRIVILEGES.getResourceType(), + PoliciesConfig.TAG_PRIVILEGES.getResourceType(), update.getUrn(), orPrivilegeGroups); } diff --git a/docker/airflow/local_airflow.md b/docker/airflow/local_airflow.md index 3fce26c7105ac..376f1132469a5 100644 --- a/docker/airflow/local_airflow.md +++ b/docker/airflow/local_airflow.md @@ -1,7 +1,7 @@ # Running Airflow locally with DataHub ## Introduction -This document describes how you can run Airflow side-by-side with DataHub's docker images to test out Airflow lineage with DataHub. +This document describes how you can run Airflow side-by-side with DataHub's quickstart docker images to test out Airflow lineage with DataHub. This offers a much easier way to try out Airflow with DataHub, compared to configuring containers by hand, setting up configurations and networking connectivity between the two systems. ## Pre-requisites @@ -11,6 +11,7 @@ docker info | grep Memory > Total Memory: 7.775GiB ``` +- Quickstart: Ensure that you followed [quickstart](../../docs/quickstart.md) to get DataHub up and running. ## Step 1: Set up your Airflow area - Create an area to host your airflow installation @@ -20,7 +21,7 @@ docker info | grep Memory ``` mkdir -p airflow_install cd airflow_install -# Download docker-compose +# Download docker-compose file curl -L 'https://raw.githubusercontent.com/linkedin/datahub/master/docker/airflow/docker-compose.yaml' -o docker-compose.yaml # Create dags directory mkdir -p dags @@ -94,7 +95,7 @@ Default username and password is: airflow:airflow ``` -## Step 4: Register DataHub connection (hook) to Airflow +## Step 3: Register DataHub connection (hook) to Airflow ``` docker exec -it `docker ps | grep webserver | cut -d " " -f 1` airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' @@ -111,7 +112,7 @@ Successfully added `conn_id`=datahub_rest_default : datahub_rest://:@http://data - Note: This is what requires Airflow to be able to connect to `datahub-gms` the host (this is the container running datahub-gms image) and this is why we needed to connect the Airflow containers to the `datahub_network` using our custom docker-compose file. -## Step 3: Find the DAGs and run it +## Step 4: Find the DAGs and run it Navigate the Airflow UI to find the sample Airflow dag we just brought in ![Find the DAG](../../docs/imgs/airflow/find_the_dag.png) diff --git a/docker/datahub-gms/Dockerfile b/docker/datahub-gms/Dockerfile index f400a41b30216..62cb56bcf0a10 100644 --- a/docker/datahub-gms/Dockerfile +++ b/docker/datahub-gms/Dockerfile @@ -14,7 +14,7 @@ RUN apk --no-cache --update-cache --available upgrade \ else \ echo >&2 "Unsupported architecture $(arch)" ; exit 1; \ fi \ - && apk --no-cache add tar curl openjdk8-jre \ + && apk --no-cache add tar curl openjdk8-jre bash \ && curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.20.v20190813/jetty-runner-9.4.20.v20190813.jar --output jetty-runner.jar \ && curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-jmx/9.4.20.v20190813/jetty-jmx-9.4.20.v20190813.jar --output jetty-jmx.jar \ && curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-util/9.4.20.v20190813/jetty-util-9.4.20.v20190813.jar --output jetty-util.jar \ diff --git a/docker/datahub-gms/start.sh b/docker/datahub-gms/start.sh index 941f1b8f68887..6281a07f3896f 100755 --- a/docker/datahub-gms/start.sh +++ b/docker/datahub-gms/start.sh @@ -1,18 +1,13 @@ -#!/bin/sh - +#!/bin/bash +set -x # Add default URI (http) scheme if needed if ! echo $NEO4J_HOST | grep -q "://" ; then NEO4J_HOST="http://$NEO4J_HOST" fi -if [[ -z $ELASTICSEARCH_USERNAME ]]; then - ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_HOST -else - if [[ -z $ELASTICSEARCH_AUTH_HEADER ]]; then - ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_USERNAME:$ELASTICSEARCH_PASSWORD@$ELASTICSEARCH_HOST - else - ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_HOST - fi +if [[ ! -z $ELASTICSEARCH_USERNAME ]] && [[ -z $ELASTICSEARCH_AUTH_HEADER ]]; then + AUTH_TOKEN=$(echo -ne "$ELASTICSEARCH_USERNAME:$ELASTICSEARCH_PASSWORD" | base64 --wrap 0) + ELASTICSEARCH_AUTH_HEADER="Authorization:Basic $AUTH_TOKEN" fi # Add default header if needed @@ -26,9 +21,18 @@ else ELASTICSEARCH_PROTOCOL=http fi -WAIT_FOR_NEO4J="" +WAIT_FOR_EBEAN="" +if [[ $SKIP_EBEAN_CHECK != true ]]; then + WAIT_FOR_EBEAN=" -wait tcp://$EBEAN_DATASOURCE_HOST " +fi + +WAIT_FOR_KAFKA="" +if [[ $SKIP_KAFKA_CHECK != true ]]; then + WAIT_FOR_KAFKA=" -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') " +fi -if [[ $GRAPH_SERVICE_IMPL != elasticsearch ]]; then +WAIT_FOR_NEO4J="" +if [[ $GRAPH_SERVICE_IMPL != elasticsearch ]] && [[ $SKIP_NEO4J_CHECK != true ]]; then WAIT_FOR_NEO4J=" -wait $NEO4J_HOST " fi @@ -42,16 +46,23 @@ if [[ $ENABLE_PROMETHEUS == true ]]; then PROMETHEUS_AGENT="-javaagent:jmx_prometheus_javaagent.jar=4318:/datahub/datahub-gms/scripts/prometheus-config.yaml " fi -dockerize \ - -wait tcp://$EBEAN_DATASOURCE_HOST \ - -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') \ - -wait $ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT -wait-http-header "$ELASTICSEARCH_AUTH_HEADER" \ - $WAIT_FOR_NEO4J \ - -timeout 240s \ - java $JAVA_OPTS $JMX_OPTS \ - $OTEL_AGENT \ - $PROMETHEUS_AGENT \ - -jar /jetty-runner.jar \ - --jar jetty-util.jar \ - --jar jetty-jmx.jar \ - /datahub/datahub-gms/bin/war.war +COMMON=" + $WAIT_FOR_EBEAN \ + $WAIT_FOR_KAFKA \ + $WAIT_FOR_NEO4J \ + -timeout 240s \ + java $JAVA_OPTS $JMX_OPTS \ + $OTEL_AGENT \ + $PROMETHEUS_AGENT \ + -jar /jetty-runner.jar \ + --jar jetty-util.jar \ + --jar jetty-jmx.jar \ + /datahub/datahub-gms/bin/war.war" + +if [[ $SKIP_ELASTICSEARCH_CHECK != true ]]; then + dockerize \ + -wait $ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT -wait-http-header "$ELASTICSEARCH_AUTH_HEADER" \ + $COMMON +else + dockerize $COMMON +fi diff --git a/docker/datahub-mae-consumer/Dockerfile b/docker/datahub-mae-consumer/Dockerfile index f112e61b7bbc8..5240100de3bca 100644 --- a/docker/datahub-mae-consumer/Dockerfile +++ b/docker/datahub-mae-consumer/Dockerfile @@ -3,7 +3,7 @@ ARG APP_ENV=prod FROM adoptopenjdk/openjdk8:alpine-jre as base ENV DOCKERIZE_VERSION v0.6.1 -RUN apk --no-cache add curl tar wget \ +RUN apk --no-cache add curl tar wget bash \ && wget https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.4.1/opentelemetry-javaagent-all.jar \ && wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar -O jmx_prometheus_javaagent.jar \ && curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv diff --git a/docker/datahub-mae-consumer/start.sh b/docker/datahub-mae-consumer/start.sh index 158397dd0cb50..6c54b1b7ad2d1 100755 --- a/docker/datahub-mae-consumer/start.sh +++ b/docker/datahub-mae-consumer/start.sh @@ -1,18 +1,13 @@ -#!/bin/sh +#!/bin/bash # Add default URI (http) scheme if needed if ! echo $NEO4J_HOST | grep -q "://" ; then NEO4J_HOST="http://$NEO4J_HOST" fi -if [[ -z $ELASTICSEARCH_USERNAME ]]; then - ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_HOST -else - if [[ -z $ELASTICSEARCH_AUTH_HEADER ]]; then - ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_USERNAME:$ELASTICSEARCH_PASSWORD@$ELASTICSEARCH_HOST - else - ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_HOST - fi +if [[ ! -z $ELASTICSEARCH_USERNAME ]] && [[ -z $ELASTICSEARCH_AUTH_HEADER ]]; then + AUTH_TOKEN=$(echo -ne "$ELASTICSEARCH_USERNAME:$ELASTICSEARCH_PASSWORD" | base64 --wrap 0) + ELASTICSEARCH_AUTH_HEADER="Authorization:Basic $AUTH_TOKEN" fi # Add default header if needed @@ -26,9 +21,18 @@ else ELASTICSEARCH_PROTOCOL=http fi -WAIT_FOR_NEO4J="" +WAIT_FOR_KAFKA="" +if [[ $SKIP_KAFKA_CHECK != true ]]; then + WAIT_FOR_KAFKA=" -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') " +fi + +WAIT_FOR_ELASTICSEARCH="" +if [[ $SKIP_ELASTICSEARCH_CHECK != true ]]; then + WAIT_FOR_ELASTICSEARCH=" -wait $ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT -wait-http-header \"$ELASTICSEARCH_AUTH_HEADER\"" +fi -if [[ $GRAPH_SERVICE_IMPL != elasticsearch ]]; then +WAIT_FOR_NEO4J="" +if [[ $GRAPH_SERVICE_IMPL != elasticsearch ]] && [[ $SKIP_NEO4J_CHECK != true ]]; then WAIT_FOR_NEO4J=" -wait $NEO4J_HOST " fi @@ -42,9 +46,16 @@ if [[ $ENABLE_PROMETHEUS == true ]]; then PROMETHEUS_AGENT="-javaagent:jmx_prometheus_javaagent.jar=4318:/datahub/datahub-mae-consumer/scripts/prometheus-config.yaml " fi -dockerize \ - -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') \ - -wait $ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT -wait-http-header "$ELASTICSEARCH_AUTH_HEADER" \ - $WAIT_FOR_NEO4J \ - -timeout 240s \ - java $JAVA_OPTS $JMX_OPTS $OTEL_AGENT $PROMETHEUS_AGENT -jar /datahub/datahub-mae-consumer/bin/mae-consumer-job.jar +COMMON=" + $WAIT_FOR_KAFKA \ + $WAIT_FOR_NEO4J \ + -timeout 240s \ + java $JAVA_OPTS $JMX_OPTS $OTEL_AGENT $PROMETHEUS_AGENT -jar /datahub/datahub-mae-consumer/bin/mae-consumer-job.jar +" +if [[ $SKIP_ELASTICSEARCH_CHECK != true ]]; then + dockerize $COMMON +else + dockerize \ + -wait $ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT -wait-http-header "$ELASTICSEARCH_AUTH_HEADER" \ + $COMMON +fi diff --git a/docker/datahub-mce-consumer/Dockerfile b/docker/datahub-mce-consumer/Dockerfile index da32106c4ec94..9854c7f70748a 100644 --- a/docker/datahub-mce-consumer/Dockerfile +++ b/docker/datahub-mce-consumer/Dockerfile @@ -3,7 +3,7 @@ ARG APP_ENV=prod FROM adoptopenjdk/openjdk8:alpine-jre as base ENV DOCKERIZE_VERSION v0.6.1 -RUN apk --no-cache add curl tar wget \ +RUN apk --no-cache add curl tar wget bash \ && wget https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.4.1/opentelemetry-javaagent-all.jar \ && wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar -O jmx_prometheus_javaagent.jar \ && curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv diff --git a/docker/datahub-mce-consumer/start.sh b/docker/datahub-mce-consumer/start.sh index ec319feac6011..6c0e47cd9dd17 100755 --- a/docker/datahub-mce-consumer/start.sh +++ b/docker/datahub-mce-consumer/start.sh @@ -1,4 +1,9 @@ -#!/bin/sh +#!/bin/bash + +WAIT_FOR_KAFKA="" +if [[ $SKIP_KAFKA_CHECK != true ]]; then + WAIT_FOR_KAFKA=" -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') " +fi OTEL_AGENT="" if [[ $ENABLE_OTEL == true ]]; then @@ -11,6 +16,6 @@ if [[ $ENABLE_PROMETHEUS == true ]]; then fi dockerize \ - -wait tcp://$(echo $KAFKA_BOOTSTRAP_SERVER | sed 's/,/ -wait tcp:\/\//g') \ + $WAIT_FOR_KAFKA \ -timeout 240s \ java $JAVA_OPTS $JMX_OPTS $OTEL_AGENT $PROMETHEUS_AGENT -jar /datahub/datahub-mce-consumer/bin/mce-consumer-job.jar \ No newline at end of file diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index 706705fd2d960..c1183bf1a0283 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -37,6 +37,8 @@ services: dockerfile: Dockerfile args: APP_ENV: dev + environment: + - SKIP_ELASTICSEARCH_CHECK=false volumes: - ./datahub-gms/start.sh:/datahub/datahub-gms/scripts/start.sh - ./monitoring/client-prometheus-config.yaml:/datahub/datahub-gms/scripts/prometheus-config.yaml diff --git a/docker/elasticsearch-setup/Dockerfile b/docker/elasticsearch-setup/Dockerfile index 2c29e01b67d45..d9a662321b3b9 100644 --- a/docker/elasticsearch-setup/Dockerfile +++ b/docker/elasticsearch-setup/Dockerfile @@ -5,7 +5,7 @@ ARG APP_ENV=prod FROM alpine:3 as base ENV DOCKERIZE_VERSION v0.6.1 -RUN apk add --no-cache curl jq tar \ +RUN apk add --no-cache curl jq tar bash \ && curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv FROM base AS prod-install @@ -21,4 +21,6 @@ FROM base AS dev-install FROM ${APP_ENV}-install AS final CMD if [ "$ELASTICSEARCH_USE_SSL" == "true" ]; then ELASTICSEARCH_PROTOCOL=https; else ELASTICSEARCH_PROTOCOL=http; fi \ && if [[ -n "$ELASTICSEARCH_USERNAME" ]]; then ELASTICSEARCH_HTTP_HEADERS="Authorization: Basic $(echo -ne "$ELASTICSEARCH_USERNAME:$ELASTICSEARCH_PASSWORD" | base64)"; else ELASTICSEARCH_HTTP_HEADERS="Accept: */*"; fi \ - && dockerize -wait $ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT -wait-http-header "${ELASTICSEARCH_HTTP_HEADERS}" -timeout 120s /create-indices.sh + && if [[ "$SKIP_ELASTICSEARCH_CHECK" != "true" ]]; then \ + dockerize -wait $ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT -wait-http-header "${ELASTICSEARCH_HTTP_HEADERS}" -timeout 120s /create-indices.sh; \ + else /create-indices.sh; fi diff --git a/docker/elasticsearch-setup/create-indices.sh b/docker/elasticsearch-setup/create-indices.sh index c20114c7170a2..4ada7a7cb6962 100755 --- a/docker/elasticsearch-setup/create-indices.sh +++ b/docker/elasticsearch-setup/create-indices.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash set -e @@ -11,10 +11,14 @@ else ELASTICSEARCH_PROTOCOL=http fi -if [[ -z $ELASTICSEARCH_USERNAME ]]; then - ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_HOST -else - ELASTICSEARCH_HOST_URL=$ELASTICSEARCH_USERNAME:$ELASTICSEARCH_PASSWORD@$ELASTICSEARCH_HOST +if [[ ! -z $ELASTICSEARCH_USERNAME ]] && [[ -z $ELASTICSEARCH_AUTH_HEADER ]]; then + AUTH_TOKEN=$(echo -ne "$ELASTICSEARCH_USERNAME:$ELASTICSEARCH_PASSWORD" | base64 --wrap 0) + ELASTICSEARCH_AUTH_HEADER="Authorization:Basic $AUTH_TOKEN" +fi + +# Add default header if needed +if [[ -z $ELASTICSEARCH_AUTH_HEADER ]]; then + ELASTICSEARCH_AUTH_HEADER="Accept: */*" fi function create_datahub_usage_event_datastream() { @@ -24,19 +28,19 @@ function create_datahub_usage_event_datastream() { PREFIX="${INDEX_PREFIX}_" fi - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/${PREFIX}datahub_usage_event_policy") -eq 404 ] + if [ $(curl -o /dev/null -s -w "%{http_code}" --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_ilm/policy/${PREFIX}datahub_usage_event_policy") -eq 404 ] then echo -e "\ncreating datahub_usage_event_policy" sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/policy.json | tee -a /tmp/policy.json - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_ilm/policy/${PREFIX}datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/tmp/policy.json + curl -XPUT --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_ilm/policy/${PREFIX}datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/tmp/policy.json else echo -e "\ndatahub_usage_event_policy exists" fi - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/${PREFIX}datahub_usage_event_index_template") -eq 404 ] + if [ $(curl -o /dev/null -s -w "%{http_code}" --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_index_template/${PREFIX}datahub_usage_event_index_template") -eq 404 ] then echo -e "\ncreating datahub_usage_event_index_template" sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/index_template.json | tee -a /tmp/index_template.json - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_index_template/${PREFIX}datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/tmp/index_template.json + curl -XPUT --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_index_template/${PREFIX}datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/tmp/index_template.json else echo -e "\ndatahub_usage_event_index_template exists" fi @@ -49,20 +53,20 @@ function create_datahub_usage_event_aws_elasticsearch() { PREFIX="${INDEX_PREFIX}_" fi - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy") -eq 404 ] + if [ $(curl -o /dev/null -s -w "%{http_code}" --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy") -eq 404 ] then echo -e "\ncreating datahub_usage_event_policy" sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/aws_es_ism_policy.json | tee -a /tmp/aws_es_ism_policy.json - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/tmp/aws_es_ism_policy.json + curl -XPUT --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_opendistro/_ism/policies/${PREFIX}datahub_usage_event_policy" -H 'Content-Type: application/json' --data @/tmp/aws_es_ism_policy.json else echo -e "\ndatahub_usage_event_policy exists" fi - if [ $(curl -o /dev/null -s -w "%{http_code}" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template") -eq 404 ] + if [ $(curl -o /dev/null -s -w "%{http_code}" --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template") -eq 404 ] then echo -e "\ncreating datahub_usagAe_event_index_template" sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/aws_es_index_template.json | tee -a /tmp/aws_es_index_template.json - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/tmp/aws_es_index_template.json - curl -XPUT "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST_URL:$ELASTICSEARCH_PORT/${PREFIX}datahub_usage_event-000001" -H 'Content-Type: application/json' --data "{\"aliases\":{\"${PREFIX}datahub_usage_event\":{\"is_write_index\":true}}}" + curl -XPUT --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/tmp/aws_es_index_template.json + curl -XPUT --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/${PREFIX}datahub_usage_event-000001" -H 'Content-Type: application/json' --data "{\"aliases\":{\"${PREFIX}datahub_usage_event\":{\"is_write_index\":true}}}" else echo -e "\ndatahub_usage_event_index_template exists" fi @@ -75,4 +79,3 @@ if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then create_datahub_usage_event_aws_elasticsearch || exit 1 fi fi - diff --git a/docs-website/docusaurus.config.js b/docs-website/docusaurus.config.js index 9307ffece8763..2c7de89d8c75b 100644 --- a/docs-website/docusaurus.config.js +++ b/docs-website/docusaurus.config.js @@ -68,7 +68,7 @@ module.exports = { position: "right", }, { - href: "https://feature-requests.datahubproject.io/roadmap", + to: "docs/roadmap", label: "Roadmap", position: "right", }, @@ -151,7 +151,7 @@ module.exports = { }, { label: "Roadmap", - href: "https://feature-requests.datahubproject.io/roadmap", + to: "docs/roadmap", }, { label: "Contributing", diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 36c178058c889..e31505f76edc4 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -85,6 +85,21 @@ module.exports = { { Sinks: list_ids_in_directory("metadata-ingestion/sink_docs"), }, + { + Scheduling: [ + "metadata-ingestion/schedule_docs/intro", + "metadata-ingestion/schedule_docs/cron", + "metadata-ingestion/schedule_docs/airflow", + ], + }, + { + Lineage: [ + "docs/lineage/intro", + "docs/lineage/airflow", + "docker/airflow/local_airflow", + "docs/lineage/sample_code", + ], + }, ], "Metadata Modeling": [ "docs/modeling/metadata-model", @@ -191,7 +206,6 @@ module.exports = { "docs/how/delete-metadata", "datahub-web-react/src/app/analytics/README", "metadata-ingestion/developing", - "docker/airflow/local_airflow", "docs/how/add-custom-data-platform", "docs/how/add-custom-ingestion-source", { diff --git a/docs/cli.md b/docs/cli.md index 56ee0f69be671..3e34b84c5e8e9 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -1,10 +1,8 @@ # DataHub CLI -DataHub comes with a friendly cli called `datahub` that allows you to perform a lot of common operations using just the command line. +DataHub comes with a friendly cli called `datahub` that allows you to perform a lot of common operations using just the command line. -## Install - -### Using pip +## Using pip We recommend python virtual environments (venv-s) to namespace pip modules. Here's an example setup: @@ -16,6 +14,7 @@ source datahub-env/bin/activate # activate the environment **_NOTE:_** If you install `datahub` in a virtual environment, that same virtual environment must be re-activated each time a shell window or session is created. Once inside the virtual environment, install `datahub` using the following commands + ```console # Requires Python 3.6+ python3 -m pip install --upgrade pip wheel setuptools @@ -26,13 +25,16 @@ datahub version If you run into an error, try checking the [_common setup issues_](../metadata-ingestion/developing.md#Common-setup-issues). +### Using docker + +You can use the `datahub-ingestion` docker image as explained in [Docker Images](../docker/README.md). In case you are using Kubernetes you can start a pod with the `datahub-ingestion` docker image, log onto a shell on the pod and you should have the access to datahub CLI in your kubernetes cluster. + ## User Guide -The `datahub` cli allows you to do many things, such as quickstarting a DataHub docker instance locally, ingesting metadata from your sources, as well as retrieving and modifying metadata. -Like most command line tools, `--help` is your best friend. Use it to discover the capabilities of the cli and the different commands and sub-commands that are supported. +The `datahub` cli allows you to do many things, such as quickstarting a DataHub docker instance locally, ingesting metadata from your sources, as well as retrieving and modifying metadata. +Like most command line tools, `--help` is your best friend. Use it to discover the capabilities of the cli and the different commands and sub-commands that are supported. ```console -datahub --help Usage: datahub [OPTIONS] COMMAND [ARGS]... Options: @@ -41,22 +43,23 @@ Options: --help Show this message and exit. Commands: - check Helper commands for checking various aspects of DataHub. - delete Delete metadata from datahub using a single urn or a combination of filters - docker Helper commands for setting up and interacting with a local DataHub instance using Docker. - get Get metadata for an entity with an optional list of aspects to project - ingest Ingest metadata into DataHub. - init Configure which datahub instance to connect to - put Update a single aspect of an entity - version Print version number and exit. + check Helper commands for checking various aspects of DataHub. + delete Delete metadata from datahub using a single urn or a combination of filters + docker Helper commands for setting up and interacting with a local DataHub instance using Docker. + get Get metadata for an entity with an optional list of aspects to project + ingest Ingest metadata into DataHub. + init Configure which datahub instance to connect to + put Update a single aspect of an entity + telemetry Toggle telemetry. + version Print version number and exit. ``` -The following top-level commands listed below are here mainly to give the reader a high-level picture of what are the kinds of things you can accomplish with the cli. -We've ordered them roughly in the order we expect you to interact with these commands as you get deeper into the `datahub`-verse. +The following top-level commands listed below are here mainly to give the reader a high-level picture of what are the kinds of things you can accomplish with the cli. +We've ordered them roughly in the order we expect you to interact with these commands as you get deeper into the `datahub`-verse. ### docker -The `docker` command allows you to start up a local DataHub instance using `datahub docker quickstart`. You can also check if the docker cluster is healthy using `datahub docker check`. +The `docker` command allows you to start up a local DataHub instance using `datahub docker quickstart`. You can also check if the docker cluster is healthy using `datahub docker check`. ### ingest @@ -83,6 +86,15 @@ DATAHUB_GMS_TOKEN= # Used for communicating with DataHub Cloud The env variables take precedence over what is in the config. ``` +### telemetry + +To help us understand how people are using DataHub, we collect anonymous usage statistics on actions such as command invocations via Google Analytics. +We do not collect private information such as IP addresses, contents of ingestions, or credentials. +The code responsible for collecting and broadcasting these events is open-source and can be found [within our GitHub](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/telemetry/telemetry.py). + +Telemetry is enabled by default, and the `telemetry` command lets you toggle the sending of these statistics via `telemetry enable/disable`. +You can also disable telemetry by setting `DATAHUB_TELEMETRY_ENABLED` to `false`. + ### delete The `delete` command allows you to delete metadata from DataHub. Read this [guide](./how/delete-metadata.md) to understand how you can delete metadata from DataHub. @@ -93,7 +105,7 @@ datahub delete --urn "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset ### get -The `get` command allows you to easily retrieve metadata from DataHub, by using the REST API. +The `get` command allows you to easily retrieve metadata from DataHub, by using the REST API. For example the following command gets the ownership aspect from the dataset `urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)` ```console @@ -134,11 +146,12 @@ datahub get --urn "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PR } ``` -### put +### put -The `put` command allows you to write metadata into DataHub. This is a flexible way for you to issue edits to metadata from the command line. -For example, the following command instructs `datahub` to set the `ownership` aspect of the dataset `urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)` to the value in the file `ownership.json`. +The `put` command allows you to write metadata into DataHub. This is a flexible way for you to issue edits to metadata from the command line. +For example, the following command instructs `datahub` to set the `ownership` aspect of the dataset `urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)` to the value in the file `ownership.json`. The JSON in the `ownership.json` file needs to conform to the [`Ownership`](https://github.com/linkedin/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/common/Ownership.pdl) Aspect model as shown below. + ```json { "owners": [ @@ -161,8 +174,3 @@ datahub --debug put --urn "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDa curl -X POST -H 'User-Agent: python-requests/2.26.0' -H 'Accept-Encoding: gzip, deflate' -H 'Accept: */*' -H 'Connection: keep-alive' -H 'X-RestLi-Protocol-Version: 2.0.0' -H 'Content-Type: application/json' --data '{"proposal": {"entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", "aspectName": "ownership", "changeType": "UPSERT", "aspect": {"contentType": "application/json", "value": "{\"owners\": [{\"owner\": \"urn:li:corpUser:jdoe\", \"type\": \"DEVELOPER\"}, {\"owner\": \"urn:li:corpUser:jdub\", \"type\": \"DATAOWNER\"}]}"}}}' 'http://localhost:8080/aspects/?action=ingestProposal' Update succeeded with status 200 ``` - - - - - diff --git a/docs/how/delete-metadata.md b/docs/how/delete-metadata.md index a5e505d18b8fe..80d58aab255e4 100644 --- a/docs/how/delete-metadata.md +++ b/docs/how/delete-metadata.md @@ -4,15 +4,12 @@ There are a two ways to delete metadata from DataHub. - Delete metadata attached to entities by providing a specific urn or a filter that identifies a set of entities - Delete metadata affected by a single ingestion run +To follow this guide you need to use [DataHub CLI](../cli.md). + Read on to find out how to perform these kinds of deletes. _Note: Deleting metadata should only be done with care. Always use `--dry-run` to understand what will be deleted before proceeding. Prefer soft-deletes (`--soft`) unless you really want to nuke metadata rows. Hard deletes will actually delete rows in the primary store and recovering them will require using backups of the primary metadata store. Make sure you understand the implications of issuing soft-deletes versus hard-deletes before proceeding._ -## The `datahub` CLI - -To use the datahub CLI you follow the installation and configuration guide at [DataHub CLI](../cli.md) or you can use the `datahub-ingestion` docker image as explained in [Docker Images](../../docker/README.md). In case you are using Kubernetes you can start a pod with the `datahub-ingestion` docker image, log onto a shell on the pod and you should have the access to datahub CLI in your kubernetes cluster. - - ## Delete By Urn To delete all the data related to a single entity, run diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md new file mode 100644 index 0000000000000..26a89cc2109b8 --- /dev/null +++ b/docs/lineage/airflow.md @@ -0,0 +1,62 @@ +# Lineage with Airflow + +There's a couple ways to get lineage information from Airflow into DataHub. + + +## Using Datahub's Airflow lineage backend (recommended) + +:::caution + +The Airflow lineage backend is only supported in Airflow 1.10.15+ and 2.0.2+. + +::: + +## Running on Docker locally + +If you are looking to run Airflow and DataHub using docker locally, follow the guide [here](../../docker/airflow/local_airflow.md). Otherwise proceed to follow the instructions below. + +## Setting up Airflow to use DataHub as Lineage Backend + +1. You need to install the required dependency in your airflow. See https://registry.astronomer.io/providers/datahub/modules/datahublineagebackend + +```shell + pip install acryl-datahub[airflow] +``` + +2. You must configure an Airflow hook for Datahub. We support both a Datahub REST hook and a Kafka-based hook, but you only need one. + + ```shell + # For REST-based: + airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://localhost:8080' + # For Kafka-based (standard Kafka sink config can be passed via extras): + airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}' + ``` + +3. Add the following lines to your `airflow.cfg` file. + ```ini + [lineage] + backend = datahub_provider.lineage.datahub.DatahubLineageBackend + datahub_kwargs = { + "datahub_conn_id": "datahub_rest_default", + "cluster": "prod", + "capture_ownership_info": true, + "capture_tags_info": true, + "graceful_exceptions": true } + # The above indentation is important! + ``` + **Configuration options:** + - `datahub_conn_id` (required): Usually `datahub_rest_default` or `datahub_kafka_default`, depending on what you named the connection in step 1. + - `cluster` (defaults to "prod"): The "cluster" to associate Airflow DAGs and tasks with. + - `capture_ownership_info` (defaults to true): If true, the owners field of the DAG will be capture as a DataHub corpuser. + - `capture_tags_info` (defaults to true): If true, the tags field of the DAG will be captured as DataHub tags. + - `graceful_exceptions` (defaults to true): If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. +4. Configure `inlets` and `outlets` for your Airflow operators. For reference, look at the sample DAG in [`lineage_backend_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py), or reference [`lineage_backend_taskflow_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py) if you're using the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html). +5. [optional] Learn more about [Airflow lineage](https://airflow.apache.org/docs/apache-airflow/stable/lineage.html), including shorthand notation and some automation. + +## Emitting lineage via a separate operator + +Take a look at this sample DAG: + +- [`lineage_emission_dag.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_emission_dag.py) - emits lineage using the DatahubEmitterOperator. + +In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details. \ No newline at end of file diff --git a/docs/lineage/intro.md b/docs/lineage/intro.md new file mode 100644 index 0000000000000..f74e269f01b3b --- /dev/null +++ b/docs/lineage/intro.md @@ -0,0 +1,3 @@ +# Introduction to Lineage + +See [this video](https://www.youtube.com/watch?v=rONGpsndzRw&ab_channel=DataHub) for Lineage 101 in DataHub. \ No newline at end of file diff --git a/docs/lineage/sample_code.md b/docs/lineage/sample_code.md new file mode 100644 index 0000000000000..5d18f965919c5 --- /dev/null +++ b/docs/lineage/sample_code.md @@ -0,0 +1,19 @@ +# Lineage sample code + +The following samples will cover emitting dataset-to-dataset, dataset-to-job-to-dataset, chart-to-dataset, dashboard-to-chart and job-to-dataflow lineages. +- [lineage_emitter_mcpw_rest.py](../../metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py) - emits simple bigquery table-to-table (dataset-to-dataset) lineage via REST as MetadataChangeProposalWrapper. +- [lineage_dataset_job_dataset.py](../../metadata-ingestion/examples/library/lineage_dataset_job_dataset.py) - emits mysql-to-airflow-to-kafka (dataset-to-job-to-dataset) lineage via REST as MetadataChangeProposalWrapper. +- [lineage_dataset_chart.py](../../metadata-ingestion/examples/library/lineage_dataset_chart.py) - emits the dataset-to-chart lineage via REST as MetadataChangeProposalWrapper. +- [lineage_chart_dashboard.py](../../metadata-ingestion/examples/library/lineage_chart_dashboard.py) - emits the chart-to-dashboard lineage via REST as MetadataChangeProposalWrapper. +- [lineage_job_dataflow.py](../../metadata-ingestion/examples/library/lineage_job_dataflow.py) - emits the job-to-dataflow lineage via REST as MetadataChangeProposalWrapper. +- [lineage_emitter_rest.py](../../metadata-ingestion/examples/library/lineage_emitter_rest.py) - emits simple dataset-to-dataset lineage via REST as MetadataChangeEvent. +- [lineage_emitter_kafka.py](../../metadata-ingestion/examples/library/lineage_emitter_kafka.py) - emits simple dataset-to-dataset lineage via Kafka as MetadataChangeEvent. +- [Datahub Snowflake Lineage](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py#L249) - emits Datahub's Snowflake lineage as MetadataChangeProposalWrapper. +- [Datahub Bigquery Lineage](https://github.com/linkedin/datahub/blob/a1bf95307b040074c8d65ebb86b5eb177fdcd591/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py#L229) - emits Datahub's Bigquery lineage as MetadataChangeProposalWrapper. +- [Datahub Dbt Lineage](https://github.com/linkedin/datahub/blob/a9754ebe83b6b73bc2bfbf49d9ebf5dbd2ca5a8f/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L625,L630) - emits Datahub's DBT lineage as MetadataChangeEvent. + +NOTE: +- Emitting aspects as MetadataChangeProposalWrapper is recommended over emitting aspects via the +MetadataChangeEvent. +- Emitting any aspect associated with an entity completely overwrites the previous +value of the aspect associated with the entity. This means that emitting a lineage aspect associated with a dataset will overwrite lineage edges that already exist. \ No newline at end of file diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 37d0ff399ac7f..894ccb87c42d4 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -186,101 +186,12 @@ In some cases, you might want to construct the MetadataChangeEvents yourself but - [DataHub emitter via REST](./src/datahub/emitter/rest_emitter.py) (same requirements as `datahub-rest`). - [DataHub emitter via Kafka](./src/datahub/emitter/kafka_emitter.py) (same requirements as `datahub-kafka`). -### Sample code -#### Lineage -The following samples will cover emitting dataset-to-dataset, dataset-to-job-to-dataset, chart-to-dataset, dashboard-to-chart and job-to-dataflow lineages. -- [lineage_emitter_mcpw_rest.py](./examples/library/lineage_emitter_mcpw_rest.py) - emits simple bigquery table-to-table (dataset-to-dataset) lineage via REST as MetadataChangeProposalWrapper. -- [lineage_dataset_job_dataset.py](./examples/library/lineage_dataset_job_dataset.py) - emits mysql-to-airflow-to-kafka (dataset-to-job-to-dataset) lineage via REST as MetadataChangeProposalWrapper. -- [lineage_dataset_chart.py](./examples/library/lineage_dataset_chart.py) - emits the dataset-to-chart lineage via REST as MetadataChangeProposalWrapper. -- [lineage_chart_dashboard.py](./examples/library/lineage_chart_dashboard.py) - emits the chart-to-dashboard lineage via REST as MetadataChangeProposalWrapper. -- [lineage_job_dataflow.py](./examples/library/lineage_job_dataflow.py) - emits the job-to-dataflow lineage via REST as MetadataChangeProposalWrapper. -- [lineage_emitter_rest.py](./examples/library/lineage_emitter_rest.py) - emits simple dataset-to-dataset lineage via REST as MetadataChangeEvent. -- [lineage_emitter_kafka.py](./examples/library/lineage_emitter_kafka.py) - emits simple dataset-to-dataset lineage via Kafka as MetadataChangeEvent. -- [Datahub Snowflake Lineage](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py#L249) - emits Datahub's Snowflake lineage as MetadataChangeProposalWrapper. -- [Datahub Bigquery Lineage](https://github.com/linkedin/datahub/blob/a1bf95307b040074c8d65ebb86b5eb177fdcd591/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py#L229) - emits Datahub's Bigquery lineage as MetadataChangeProposalWrapper. -- [Datahub Dbt Lineage](https://github.com/linkedin/datahub/blob/a9754ebe83b6b73bc2bfbf49d9ebf5dbd2ca5a8f/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L625,L630) - emits Datahub's DBT lineage as MetadataChangeEvent. - -NOTE: -- Emitting aspects as MetadataChangeProposalWrapper is recommended over emitting aspects via the -MetadataChangeEvent. -- Emitting any aspect associated with an entity completely overwrites the previous -value of the aspect associated with the entity. This means that emitting a lineage aspect associated with a dataset will overwrite lineage edges that already exist. -#### Programmatic Pipeline + +### Programmatic Pipeline In some cases, you might want to configure and run a pipeline entirely from within your custom python script. Here is an example of how to do it. - [programmatic_pipeline.py](./examples/library/programatic_pipeline.py) - a basic mysql to REST programmatic pipeline. -## Lineage with Airflow - -There's a couple ways to get lineage information from Airflow into DataHub. - -:::note - -If you're simply looking to run ingestion on a schedule, take a look at these sample DAGs: - -- [`generic_recipe_sample_dag.py`](./src/datahub_provider/example_dags/generic_recipe_sample_dag.py) - reads a DataHub ingestion recipe file and runs it -- [`mysql_sample_dag.py`](./src/datahub_provider/example_dags/mysql_sample_dag.py) - runs a MySQL metadata ingestion pipeline using an inlined configuration. - -::: - -### Using Datahub's Airflow lineage backend (recommended) - -:::caution - -The Airflow lineage backend is only supported in Airflow 1.10.15+ and 2.0.2+. - -::: - -### Running on Docker locally - -If you are looking to run Airflow and DataHub using docker locally, follow the guide [here](../docker/airflow/local_airflow.md). Otherwise proceed to follow the instructions below. - -### Setting up Airflow to use DataHub as Lineage Backend - -1. You need to install the required dependency in your airflow. See https://registry.astronomer.io/providers/datahub/modules/datahublineagebackend - -```shell - pip install acryl-datahub[airflow] -``` - -2. You must configure an Airflow hook for Datahub. We support both a Datahub REST hook and a Kafka-based hook, but you only need one. - - ```shell - # For REST-based: - airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://localhost:8080' - # For Kafka-based (standard Kafka sink config can be passed via extras): - airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}' - ``` - -3. Add the following lines to your `airflow.cfg` file. - ```ini - [lineage] - backend = datahub_provider.lineage.datahub.DatahubLineageBackend - datahub_kwargs = { - "datahub_conn_id": "datahub_rest_default", - "cluster": "prod", - "capture_ownership_info": true, - "capture_tags_info": true, - "graceful_exceptions": true } - # The above indentation is important! - ``` - **Configuration options:** - - `datahub_conn_id` (required): Usually `datahub_rest_default` or `datahub_kafka_default`, depending on what you named the connection in step 1. - - `cluster` (defaults to "prod"): The "cluster" to associate Airflow DAGs and tasks with. - - `capture_ownership_info` (defaults to true): If true, the owners field of the DAG will be capture as a DataHub corpuser. - - `capture_tags_info` (defaults to true): If true, the tags field of the DAG will be captured as DataHub tags. - - `graceful_exceptions` (defaults to true): If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. -4. Configure `inlets` and `outlets` for your Airflow operators. For reference, look at the sample DAG in [`lineage_backend_demo.py`](./src/datahub_provider/example_dags/lineage_backend_demo.py), or reference [`lineage_backend_taskflow_demo.py`](./src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py) if you're using the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html). -5. [optional] Learn more about [Airflow lineage](https://airflow.apache.org/docs/apache-airflow/stable/lineage.html), including shorthand notation and some automation. - -### Emitting lineage via a separate operator - -Take a look at this sample DAG: - -- [`lineage_emission_dag.py`](./src/datahub_provider/example_dags/lineage_emission_dag.py) - emits lineage using the DatahubEmitterOperator. - -In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details. - ## Developing See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and [using transformers](./transformers.md). diff --git a/metadata-ingestion/schedule_docs/airflow.md b/metadata-ingestion/schedule_docs/airflow.md new file mode 100644 index 0000000000000..03a5930fea136 --- /dev/null +++ b/metadata-ingestion/schedule_docs/airflow.md @@ -0,0 +1,12 @@ +# Using Airflow + +If you are using Apache Airflow for your scheduling then you might want to also use it for scheduling your ingestion recipes. For any Airflow specific questions you can go through [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/) for more details. + +To schedule your recipe through Airflow you can follow these steps +- Create a recipe file e.g. `recipe.yml` +- Ensure the receipe file is in a folder accessible to your airflow workers. You can either specify absolute path on the machines where Airflow is installed or a path relative to `AIRFLOW_HOME`. +- Ensure [DataHub CLI](../../docs/cli.md) is installed in your airflow environment +- Create a sample DAG file like [`generic_recipe_sample_dag.py`](../src/datahub_provider/example_dags/generic_recipe_sample_dag.py). This will read your DataHub ingestion recipe file and run it. +- Deploy the DAG file into airflow for scheduling. Typically this involves checking in the DAG file into your dags folder which is accessible to your Airflow instance. + +Alternatively you can have an inline recipe as given in [`mysql_sample_dag.py`](../src/datahub_provider/example_dags/mysql_sample_dag.py). This runs a MySQL metadata ingestion pipeline using an inlined configuration. diff --git a/metadata-ingestion/schedule_docs/cron.md b/metadata-ingestion/schedule_docs/cron.md new file mode 100644 index 0000000000000..0264658cc1da5 --- /dev/null +++ b/metadata-ingestion/schedule_docs/cron.md @@ -0,0 +1,28 @@ +# Using Cron + +Assume you have a recipe file `/home/ubuntu/datahub_ingest/mysql_to_datahub.yml` on your machine +``` +source: + type: mysql + config: + # Coordinates + host_port: localhost:3306 + database: dbname + + # Credentials + username: root + password: example + +sink: + type: datahub-rest + config: + server: http://localhost:8080 +``` + +We can use crontab to schedule ingestion to run five minutes after midnight, every day using [DataHub CLI](../../docs/cli.md). + +``` +5 0 * * * datahub ingest -c /home/ubuntu/datahub_ingest/mysql_to_datahub.yml +``` + +Read through [crontab docs](https://man7.org/linux/man-pages/man5/crontab.5.html) for more options related to scheduling. \ No newline at end of file diff --git a/metadata-ingestion/schedule_docs/intro.md b/metadata-ingestion/schedule_docs/intro.md new file mode 100644 index 0000000000000..752b2c6e93ee2 --- /dev/null +++ b/metadata-ingestion/schedule_docs/intro.md @@ -0,0 +1,30 @@ +# Introduction to Scheduling Metadata Ingestion + +Given a recipe file `/home/ubuntu/datahub_ingest/mysql_to_datahub.yml`. +``` +source: + type: mysql + config: + # Coordinates + host_port: localhost:3306 + database: dbname + + # Credentials + username: root + password: example + +sink: + type: datahub-rest + config: + server: http://localhost:8080 +``` + +We can do ingestion of our metadata using [DataHub CLI](../../docs/cli.md) as follows + +``` +datahub ingest -c /home/ubuntu/datahub_ingest/mysql_to_datahub.yml +``` + +This will ingest metadata from the `mysql` source which is configured in the recipe file. This does ingestion once. As the source system changes we would like to have the changes reflected in DataHub. To do this someone will need to re-run the ingestion command using a recipe file. + +An alternate to running the command manually we can schedule the ingestion to run on a regular basis. In this section we give some examples of how scheduling ingestion of metadata into DataHub can be done. \ No newline at end of file diff --git a/metadata-ingestion/scripts/datahub_preflight.sh b/metadata-ingestion/scripts/datahub_preflight.sh index fea6ebf731ddd..0a72951a94ac5 100755 --- a/metadata-ingestion/scripts/datahub_preflight.sh +++ b/metadata-ingestion/scripts/datahub_preflight.sh @@ -68,7 +68,7 @@ if [ "$(basename "$(pwd)")" != "metadata-ingestion" ]; then exit 123 fi printf '✅ Current folder is metadata-ingestion (%s) folder\n' "$(pwd)" -if [[ $(uname -m) == 'arm64' || $(uname) == 'Darwin' ]]; then +if [[ $(uname -m) == 'arm64' && $(uname) == 'Darwin' ]]; then printf "👟 Running preflight for m1 mac\n" arm64_darwin_preflight fi diff --git a/metadata-ingestion/source_docs/business_glossary.md b/metadata-ingestion/source_docs/business_glossary.md index b2ebce6bc8ba0..a1cf8f417e348 100644 --- a/metadata-ingestion/source_docs/business_glossary.md +++ b/metadata-ingestion/source_docs/business_glossary.md @@ -46,7 +46,8 @@ The business glossary source file should be a `.yml` file with the following top - **users**: (optional) a list of user ids - **groups**: (optional) a list of group ids - **url**: (optional) external url pointing to where the glossary is defined externally, if applicable. -- **nodes**: list of **GlossaryNode** objects, as defined below. +- **nodes**: (optional) list of child **GlossaryNode** objects +- **terms**: (optional) list of child **GlossaryTerm** objects **GlossaryNode**: a container of **GlossaryNode** and **GlossaryTerm** objects diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index 3795ed1ee8300..a3e4636271112 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -97,6 +97,17 @@ def guess_entity_type(urn: str) -> str: return urn.split(":")[2] +def get_token(): + _, gms_token_env = get_details_from_env() + if should_skip_config(): + gms_token = gms_token_env + else: + ensure_datahub_config() + _, gms_token_conf = get_details_from_config() + gms_token = first_non_null([gms_token_env, gms_token_conf]) + return gms_token + + def get_session_and_host(): session = requests.Session() diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index aca3d948bc203..d212e3aeb852e 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -14,6 +14,7 @@ from datahub.emitter import rest_emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ChangeTypeClass, StatusClass +from datahub.telemetry import telemetry logger = logging.getLogger(__name__) @@ -87,6 +88,7 @@ def delete_for_registry( @click.option("--query", required=False, type=str) @click.option("--registry-id", required=False, type=str) @click.option("-n", "--dry-run", required=False, is_flag=True) +@telemetry.with_telemetry def delete( urn: str, force: bool, @@ -172,6 +174,7 @@ def delete( ) +@telemetry.with_telemetry def delete_with_filters( dry_run: bool, soft: bool, @@ -181,9 +184,12 @@ def delete_with_filters( env: Optional[str] = None, platform: Optional[str] = None, ) -> DeletionResult: + session, gms_host = cli_utils.get_session_and_host() + token = cli_utils.get_token() + logger.info(f"datahub configured with {gms_host}") - emitter = rest_emitter.DatahubRestEmitter(gms_server=gms_host) + emitter = rest_emitter.DatahubRestEmitter(gms_server=gms_host, token=token) batch_deletion_result = DeletionResult() urns = [ u @@ -216,6 +222,7 @@ def delete_with_filters( return batch_deletion_result +@telemetry.with_telemetry def delete_one_urn( urn: str, soft: bool = False, @@ -224,6 +231,7 @@ def delete_one_urn( cached_session_host: Optional[Tuple[sessions.Session, str]] = None, cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None, ) -> DeletionResult: + deletion_result = DeletionResult() deletion_result.num_entities = 1 deletion_result.num_records = UNKNOWN_NUM_RECORDS # Default is unknown @@ -232,7 +240,8 @@ def delete_one_urn( # Add removed aspect if not cached_emitter: _, gms_host = cli_utils.get_session_and_host() - emitter = rest_emitter.DatahubRestEmitter(gms_server=gms_host) + token = cli_utils.get_token() + emitter = rest_emitter.DatahubRestEmitter(gms_server=gms_host, token=token) else: emitter = cached_emitter if not dry_run: diff --git a/metadata-ingestion/src/datahub/cli/docker.py b/metadata-ingestion/src/datahub/cli/docker.py index ae37e6111c45b..1c44c47748170 100644 --- a/metadata-ingestion/src/datahub/cli/docker.py +++ b/metadata-ingestion/src/datahub/cli/docker.py @@ -18,6 +18,7 @@ get_client_with_error, ) from datahub.ingestion.run.pipeline import Pipeline +from datahub.telemetry import telemetry logger = logging.getLogger(__name__) @@ -72,6 +73,7 @@ def docker_check_impl() -> None: @docker.command() +@telemetry.with_telemetry def check() -> None: """Check that the Docker containers are healthy""" docker_check_impl() @@ -162,6 +164,7 @@ def should_use_neo4j_for_graph_service(graph_service_override: Optional[str]) -> default=None, help="If set, forces docker-compose to use that graph service implementation", ) +@telemetry.with_telemetry def quickstart( version: str, build_locally: bool, @@ -315,6 +318,7 @@ def quickstart( type=click.Path(exists=True, dir_okay=False), help=f"The MCE json file to ingest. Defaults to downloading {BOOTSTRAP_MCES_FILE} from GitHub", ) +@telemetry.with_telemetry def ingest_sample_data(path: Optional[str]) -> None: """Ingest sample data into a running DataHub instance.""" @@ -360,6 +364,7 @@ def ingest_sample_data(path: Optional[str]) -> None: @docker.command() +@telemetry.with_telemetry def nuke() -> None: """Remove all Docker containers, networks, and volumes associated with DataHub.""" diff --git a/metadata-ingestion/src/datahub/cli/get_cli.py b/metadata-ingestion/src/datahub/cli/get_cli.py index db4343522419a..95a28886cae51 100644 --- a/metadata-ingestion/src/datahub/cli/get_cli.py +++ b/metadata-ingestion/src/datahub/cli/get_cli.py @@ -6,6 +6,7 @@ from click.exceptions import UsageError from datahub.cli.cli_utils import get_entity +from datahub.telemetry import telemetry logger = logging.getLogger(__name__) @@ -20,8 +21,10 @@ @click.option("--urn", required=False, type=str) @click.option("-a", "--aspect", required=False, multiple=True, type=str) @click.pass_context +@telemetry.with_telemetry def get(ctx: Any, urn: Optional[str], aspect: List[str]) -> None: """Get metadata for an entity with an optional list of aspects to project""" + if urn is None: if not ctx.args: raise UsageError("Nothing for me to get. Maybe provide an urn?") diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 2c4317013ef5a..0d916641ba6db 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -18,6 +18,7 @@ ) from datahub.configuration.config_loader import load_config_file from datahub.ingestion.run.pipeline import Pipeline +from datahub.telemetry import telemetry logger = logging.getLogger(__name__) @@ -61,8 +62,10 @@ def ingest() -> None: default=False, help="If enabled, ingestion runs with warnings will yield a non-zero error code", ) +@telemetry.with_telemetry def run(config: str, dry_run: bool, preview: bool, strict_warnings: bool) -> None: """Ingest metadata into DataHub.""" + logger.debug("DataHub CLI version: %s", datahub_package.nice_version_name()) config_file = pathlib.Path(config) @@ -108,8 +111,10 @@ def parse_restli_response(response): @ingest.command() @click.argument("page_offset", type=int, default=0) @click.argument("page_size", type=int, default=100) +@telemetry.with_telemetry def list_runs(page_offset: int, page_size: int) -> None: """List recent ingestion runs to datahub""" + session, gms_host = get_session_and_host() url = f"{gms_host}/runs?action=list" @@ -143,8 +148,10 @@ def list_runs(page_offset: int, page_size: int) -> None: @ingest.command() @click.option("--run-id", required=True, type=str) +@telemetry.with_telemetry def show(run_id: str) -> None: """Describe a provided ingestion run to datahub""" + payload_obj = {"runId": run_id, "dryRun": True} structured_rows, entities_affected, aspects_affected = post_rollback_endpoint( payload_obj, "/runs?action=rollback" @@ -171,6 +178,7 @@ def show(run_id: str) -> None: @ingest.command() @click.option("--run-id", required=True, type=str) @click.option("--dry-run", "-n", required=False, is_flag=True, default=False) +@telemetry.with_telemetry def rollback(run_id: str, dry_run: bool) -> None: """Rollback a provided ingestion run to datahub""" diff --git a/metadata-ingestion/src/datahub/cli/put_cli.py b/metadata-ingestion/src/datahub/cli/put_cli.py index 05b09e9be7a58..69da72aab155a 100644 --- a/metadata-ingestion/src/datahub/cli/put_cli.py +++ b/metadata-ingestion/src/datahub/cli/put_cli.py @@ -5,6 +5,7 @@ import click from datahub.cli.cli_utils import guess_entity_type, post_entity +from datahub.telemetry import telemetry logger = logging.getLogger(__name__) @@ -20,8 +21,10 @@ @click.option("-a", "--aspect", required=True, type=str) @click.option("-d", "--aspect-data", required=True, type=str) @click.pass_context +@telemetry.with_telemetry def put(ctx: Any, urn: str, aspect: str, aspect_data: str) -> None: """Update a single aspect of an entity""" + entity_type = guess_entity_type(urn) with open(aspect_data) as fp: aspect_obj = json.load(fp) diff --git a/metadata-ingestion/src/datahub/cli/telemetry.py b/metadata-ingestion/src/datahub/cli/telemetry.py new file mode 100644 index 0000000000000..5bc771568a241 --- /dev/null +++ b/metadata-ingestion/src/datahub/cli/telemetry.py @@ -0,0 +1,22 @@ +import click + +from datahub.telemetry import telemetry as telemetry_lib + + +@click.group() +def telemetry() -> None: + """Toggle telemetry.""" + pass + + +@telemetry.command() +@telemetry_lib.with_telemetry +def enable() -> None: + """Enable telemetry for the current DataHub instance.""" + telemetry_lib.telemetry_instance.enable() + + +@telemetry.command() +def disable() -> None: + """Disable telemetry for the current DataHub instance.""" + telemetry_lib.telemetry_instance.disable() diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index fae756341c00d..f33546358e220 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -70,6 +70,10 @@ def make_tag_urn(tag: str) -> str: return f"urn:li:tag:{tag}" +def make_term_urn(term: str) -> str: + return f"urn:li:glossaryTerm:{term}" + + def make_data_flow_urn( orchestrator: str, flow_id: str, cluster: str = DEFAULT_FLOW_CLUSTER ) -> str: diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 486752ee840fb..22d60516e2b92 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -13,6 +13,8 @@ from datahub.cli.get_cli import get from datahub.cli.ingest_cli import ingest from datahub.cli.put_cli import put +from datahub.cli.telemetry import telemetry as telemetry_cli +from datahub.telemetry import telemetry logger = logging.getLogger(__name__) @@ -55,15 +57,19 @@ def datahub(debug: bool) -> None: @datahub.command() +@telemetry.with_telemetry def version() -> None: """Print version number and exit.""" + click.echo(f"DataHub CLI version: {datahub_package.nice_version_name()}") click.echo(f"Python version: {sys.version}") @datahub.command() +@telemetry.with_telemetry def init() -> None: """Configure which datahub instance to connect to""" + if os.path.isfile(DATAHUB_CONFIG_PATH): click.confirm(f"{DATAHUB_CONFIG_PATH} already exists. Overwrite?", abort=True) @@ -87,6 +93,7 @@ def init() -> None: datahub.add_command(delete) datahub.add_command(get) datahub.add_command(put) +datahub.add_command(telemetry_cli) def main(**kwargs): diff --git a/metadata-ingestion/src/datahub/ingestion/api/report.py b/metadata-ingestion/src/datahub/ingestion/api/report.py index e2efdba1fd0ba..ad9834b83081b 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/report.py +++ b/metadata-ingestion/src/datahub/ingestion/api/report.py @@ -1,15 +1,26 @@ import json import pprint +import sys from dataclasses import dataclass +from typing import Dict + +# The sort_dicts option was added in Python 3.8. +if sys.version_info >= (3, 8): + PPRINT_OPTIONS = {"sort_dicts": False} +else: + PPRINT_OPTIONS: Dict = {} @dataclass class Report: def as_obj(self) -> dict: - return self.__dict__ + return { + key: value.as_obj() if hasattr(value, "as_obj") else value + for (key, value) in self.__dict__.items() + } def as_string(self) -> str: - return pprint.pformat(self.as_obj(), width=150) + return pprint.pformat(self.as_obj(), width=150, **PPRINT_OPTIONS) def as_json(self) -> str: return json.dumps(self.as_obj()) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 627b5612e8d6c..8659c3038bc05 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -47,7 +47,10 @@ ValueFrequencyClass, ) from datahub.utilities.perf_timer import PerfTimer -from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombiner +from datahub.utilities.sqlalchemy_query_combiner import ( + SQLAlchemyQueryCombiner, + get_query_columns, +) logger: logging.Logger = logging.getLogger(__name__) @@ -204,7 +207,10 @@ def _convert_to_cardinality( def _is_single_row_query_method(query: Any) -> bool: - SINGLE_ROW_QUERY_FILE = "great_expectations/dataset/sqlalchemy_dataset.py" + SINGLE_ROW_QUERY_FILES = { + # "great_expectations/dataset/dataset.py", + "great_expectations/dataset/sqlalchemy_dataset.py", + } SINGLE_ROW_QUERY_METHODS = { "get_row_count", "get_column_min", @@ -214,8 +220,18 @@ def _is_single_row_query_method(query: Any) -> bool: "get_column_stdev", "get_column_nonnull_count", "get_column_unique_count", - # This actually returns two rows, not a single row, so we can't combine it with other queries. - # "get_column_median", + } + CONSTANT_ROW_QUERY_METHODS = { + # This actually returns two rows instead of a single row. + "get_column_median", + } + UNPREDICTABLE_ROW_QUERY_METHODS = { + "get_column_value_counts", + } + UNHANDLEABLE_ROW_QUERY_METHODS = { + "expect_column_kl_divergence_to_be_less_than", + "get_column_quantiles", # this is here for now since SQLAlchemy anonymous columns need investigation + "get_column_hist", # this requires additional investigation } COLUMN_MAP_QUERY_METHOD = "inner_wrapper" COLUMN_MAP_QUERY_SINGLE_ROW_COLUMNS = [ @@ -226,16 +242,27 @@ def _is_single_row_query_method(query: Any) -> bool: # We'll do this the inefficient way since the arrays are pretty small. stack = traceback.extract_stack() - for frame in stack: - if not frame.filename.endswith(SINGLE_ROW_QUERY_FILE): + for frame in reversed(stack): + if not any(frame.filename.endswith(file) for file in SINGLE_ROW_QUERY_FILES): continue + + if frame.name in UNPREDICTABLE_ROW_QUERY_METHODS: + return False + if frame.name in UNHANDLEABLE_ROW_QUERY_METHODS: + return False if frame.name in SINGLE_ROW_QUERY_METHODS: return True + if frame.name in CONSTANT_ROW_QUERY_METHODS: + # TODO: figure out how to handle these. + # A cross join will return (`constant` ** `queries`) rows rather + # than `constant` rows with `queries` columns. + # See https://stackoverflow.com/questions/35638753/create-query-to-join-2-tables-1-on-1-with-nothing-in-common. + return False if frame.name == COLUMN_MAP_QUERY_METHOD: # Some column map expectations are single-row. # We can disambiguate by checking the column names. - query_columns = query.columns + query_columns = get_query_columns(query) column_names = [column.name for column in query_columns] if column_names == COLUMN_MAP_QUERY_SINGLE_ROW_COLUMNS: @@ -446,6 +473,7 @@ def _get_dataset_column_sample_values( self, column_profile: DatasetFieldProfileClass, column: str ) -> None: if self.config.include_field_sample_values: + # TODO do this without GE self.dataset.set_config_value("interactive_evaluation", True) res = self.dataset.expect_column_values_to_be_in_set( @@ -662,7 +690,9 @@ def _ge_context(self) -> Iterator[GEContext]: def generate_profiles( self, requests: List[GEProfilerRequest], max_workers: int ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: - with PerfTimer() as timer, SQLAlchemyQueryCombiner( + with PerfTimer() as timer, concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) as async_executor, SQLAlchemyQueryCombiner( enabled=self.config.query_combiner_enabled, catch_exceptions=self.config.catch_exceptions, is_single_row_query_method=_is_single_row_query_method, @@ -672,28 +702,28 @@ def generate_profiles( logger.info( f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while" ) - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers - ) as async_executor: - async_profiles = [ - async_executor.submit( - self._generate_profile_from_request, - query_combiner, - request, - ) - for request in requests - ] - # Avoid using as_completed so that the results are yielded in the - # same order as the requests. - # for async_profile in concurrent.futures.as_completed(async_profiles): - for async_profile in async_profiles: - yield async_profile.result() + async_profiles = [ + async_executor.submit( + self._generate_profile_from_request, + query_combiner, + request, + ) + for request in requests + ] + + # Avoid using as_completed so that the results are yielded in the + # same order as the requests. + # for async_profile in concurrent.futures.as_completed(async_profiles): + for async_profile in async_profiles: + yield async_profile.result() logger.info( f"Profiling {len(requests)} table(s) finished in {(timer.elapsed_seconds()):.3f} seconds" ) + self.report.report_from_query_combiner(query_combiner.report) + def _generate_profile_from_request( self, query_combiner: SQLAlchemyQueryCombiner, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py index 664ddddaa9ef9..692eaba595c9a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py @@ -7,10 +7,10 @@ # This import verifies that the dependencies are available. import snowflake.sqlalchemy # noqa: F401 -from snowflake.sqlalchemy import custom_types +from snowflake.sqlalchemy import custom_types, snowdialect from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector -from sqlalchemy.sql import text +from sqlalchemy.sql import sqltypes, text import datahub.emitter.mce_builder as builder from datahub.configuration.common import AllowDenyPattern @@ -44,6 +44,8 @@ APPLICATION_NAME = "acryl_datahub" +snowdialect.ischema_names["GEOGRAPHY"] = sqltypes.NullType + class BaseSnowflakeConfig(BaseTimeWindowConfig): # Note: this config model is also used by the snowflake-usage source. diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 4fbf5f58e3439..28af83929cd7e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -47,6 +47,7 @@ TimeTypeClass, ) from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass +from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport if TYPE_CHECKING: from datahub.ingestion.source.ge_data_profiler import ( @@ -124,6 +125,8 @@ class SQLSourceReport(SourceReport): views_scanned: int = 0 filtered: List[str] = field(default_factory=list) + query_combiner: Optional[SQLAlchemyQueryCombinerReport] = None + def report_entity_scanned(self, name: str, ent_type: str = "table") -> None: """ Entity could be a view or a table @@ -138,6 +141,11 @@ def report_entity_scanned(self, name: str, ent_type: str = "table") -> None: def report_dropped(self, ent_name: str) -> None: self.filtered.append(ent_name) + def report_from_query_combiner( + self, query_combiner_report: SQLAlchemyQueryCombinerReport + ) -> None: + self.query_combiner = query_combiner_report + class SQLAlchemyConfig(ConfigModel): env: str = DEFAULT_ENV diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py index 8869242f2392a..52579db8699a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py @@ -248,6 +248,5 @@ def _parse_basic_datatype(s): return {"type": "null", "native_data_type": repr(s)} - else: raise ModuleNotFoundError("The trino plugin requires Python 3.7 or newer.") diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py index 099ee3e188a2a..77f0712ad289a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py @@ -248,6 +248,5 @@ def get_report(self) -> SourceReport: def close(self) -> None: pass - else: raise ModuleNotFoundError("The trino usage plugin requires Python 3.7 or newer.") diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index 688da04e94b25..3232eae0d13ba 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -1,7 +1,7 @@ from typing import Callable, List, Union import datahub.emitter.mce_builder as builder -from datahub.configuration.common import ConfigModel +from datahub.configuration.common import ConfigModel, KeyValuePattern from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer @@ -74,3 +74,25 @@ def __init__(self, config: SimpleDatasetTagConfig, ctx: PipelineContext): def create(cls, config_dict: dict, ctx: PipelineContext) -> "SimpleAddDatasetTags": config = SimpleDatasetTagConfig.parse_obj(config_dict) return cls(config, ctx) + + +class PatternDatasetTagsConfig(ConfigModel): + tag_pattern: KeyValuePattern = KeyValuePattern.all() + + +class PatternAddDatasetTags(AddDatasetTags): + """Transformer that adds a specified set of tags to each dataset.""" + + def __init__(self, config: PatternDatasetTagsConfig, ctx: PipelineContext): + tag_pattern = config.tag_pattern + generic_config = AddDatasetTagsConfig( + get_tags_to_add=lambda _: [ + TagAssociationClass(tag=urn) for urn in tag_pattern.value(_.urn) + ], + ) + super().__init__(generic_config, ctx) + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "PatternAddDatasetTags": + config = PatternDatasetTagsConfig.parse_obj(config_dict) + return cls(config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py new file mode 100644 index 0000000000000..9e1bd81599719 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py @@ -0,0 +1,105 @@ +from typing import Callable, List, Union + +import datahub.emitter.mce_builder as builder +from datahub.configuration.common import ConfigModel, KeyValuePattern +from datahub.configuration.import_resolver import pydantic_resolve_key +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer +from datahub.metadata.schema_classes import ( + AuditStampClass, + DatasetSnapshotClass, + GlossaryTermAssociationClass, + GlossaryTermsClass, + MetadataChangeEventClass, +) + + +class AddDatasetTermsConfig(ConfigModel): + # Workaround for https://github.com/python/mypy/issues/708. + # Suggested by https://stackoverflow.com/a/64528725/5004662. + get_terms_to_add: Union[ + Callable[[DatasetSnapshotClass], List[GlossaryTermAssociationClass]], + Callable[[DatasetSnapshotClass], List[GlossaryTermAssociationClass]], + ] + + _resolve_term_fn = pydantic_resolve_key("get_terms_to_add") + + +class AddDatasetTerms(DatasetTransformer): + """Transformer that adds glossary terms to datasets according to a callback function.""" + + ctx: PipelineContext + config: AddDatasetTermsConfig + + def __init__(self, config: AddDatasetTermsConfig, ctx: PipelineContext): + self.ctx = ctx + self.config = config + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTerms": + config = AddDatasetTermsConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass: + if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass): + return mce + terms_to_add = self.config.get_terms_to_add(mce.proposedSnapshot) + if terms_to_add: + terms = builder.get_or_add_aspect( + mce, + GlossaryTermsClass( + terms=[], + auditStamp=AuditStampClass( + time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter" + ), + ), + ) + terms.terms.extend(terms_to_add) + + return mce + + +class SimpleDatasetTermsConfig(ConfigModel): + term_urns: List[str] + + +class SimpleAddDatasetTerms(AddDatasetTerms): + """Transformer that adds a specified set of glossary terms to each dataset.""" + + def __init__(self, config: SimpleDatasetTermsConfig, ctx: PipelineContext): + terms = [GlossaryTermAssociationClass(urn=term) for term in config.term_urns] + + generic_config = AddDatasetTermsConfig( + get_terms_to_add=lambda _: terms, + ) + super().__init__(generic_config, ctx) + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "SimpleAddDatasetTerms": + config = SimpleDatasetTermsConfig.parse_obj(config_dict) + return cls(config, ctx) + + +class PatternDatasetTermsConfig(ConfigModel): + term_pattern: KeyValuePattern = KeyValuePattern.all() + + +class PatternAddDatasetTerms(AddDatasetTerms): + """Transformer that adds a specified set of glossary terms to each dataset.""" + + def __init__(self, config: PatternDatasetTermsConfig, ctx: PipelineContext): + term_pattern = config.term_pattern + generic_config = AddDatasetTermsConfig( + get_terms_to_add=lambda _: [ + GlossaryTermAssociationClass(urn=urn) + for urn in term_pattern.value(_.urn) + ], + ) + super().__init__(generic_config, ctx) + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "PatternAddDatasetTerms": + config = PatternDatasetTermsConfig.parse_obj(config_dict) + return cls(config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py index 3a764ada33136..4dd10c120d3ee 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py @@ -11,8 +11,14 @@ from datahub.ingestion.transformer.add_dataset_properties import AddDatasetProperties from datahub.ingestion.transformer.add_dataset_tags import ( AddDatasetTags, + PatternAddDatasetTags, SimpleAddDatasetTags, ) +from datahub.ingestion.transformer.add_dataset_terms import ( + AddDatasetTerms, + PatternAddDatasetTerms, + SimpleAddDatasetTerms, +) from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus from datahub.ingestion.transformer.remove_dataset_ownership import ( SimpleRemoveDatasetOwnership, @@ -32,5 +38,10 @@ transform_registry.register("add_dataset_tags", AddDatasetTags) transform_registry.register("simple_add_dataset_tags", SimpleAddDatasetTags) +transform_registry.register("pattern_add_dataset_tags", PatternAddDatasetTags) + +transform_registry.register("add_dataset_terms", AddDatasetTerms) +transform_registry.register("simple_add_dataset_terms", SimpleAddDatasetTerms) +transform_registry.register("pattern_add_dataset_terms", PatternAddDatasetTerms) transform_registry.register("add_dataset_properties", AddDatasetProperties) diff --git a/metadata-ingestion/src/datahub/telemetry/__init__.py b/metadata-ingestion/src/datahub/telemetry/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py new file mode 100644 index 0000000000000..4890d4f16c57d --- /dev/null +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -0,0 +1,145 @@ +import json +import logging +import os +import uuid +from functools import wraps +from pathlib import Path +from typing import Any, Callable, Dict, Optional, TypeVar, Union + +import requests + +import datahub as datahub_package + +logger = logging.getLogger(__name__) + +GA_VERSION = 1 +GA_TID = "UA-212728656-1" + +DATAHUB_FOLDER = Path(os.path.expanduser("~/.datahub")) + +CONFIG_FILE = DATAHUB_FOLDER / "telemetry-config.json" + +# also fall back to environment variable if config file is not found +ENV_ENABLED = os.environ.get("DATAHUB_TELEMETRY_ENABLED", "true").lower() == "true" + + +class Telemetry: + + client_id: str + enabled: bool = True + + def __init__(self): + + # init the client ID and config if it doesn't exist + if not CONFIG_FILE.exists(): + self.client_id = str(uuid.uuid4()) + self.update_config() + + else: + self.load_config() + + def update_config(self) -> None: + """ + Update the config file with the current client ID and enabled status. + """ + + if not DATAHUB_FOLDER.exists(): + os.makedirs(DATAHUB_FOLDER) + + with open(CONFIG_FILE, "w") as f: + json.dump( + {"client_id": self.client_id, "enabled": self.enabled}, f, indent=2 + ) + + def enable(self) -> None: + """ + Enable telemetry. + """ + + self.enabled = True + self.update_config() + + def disable(self) -> None: + """ + Disable telemetry. + """ + + self.enabled = False + self.update_config() + + def load_config(self): + """ + Load the saved config for the telemetry client ID and enabled status. + """ + + with open(CONFIG_FILE, "r") as f: + config = json.load(f) + self.client_id = config["client_id"] + self.enabled = config["enabled"] & ENV_ENABLED + + def ping( + self, + category: str, + action: str, + label: Optional[str] = None, + value: Optional[int] = None, + ) -> None: + """ + Ping Google Analytics with a single event. + + Args: + category (str): category for the event + action (str): action taken + label (Optional[str], optional): label for the event + value (Optional[int], optional): value for the event + """ + + if not self.enabled: + return + + req_url = "https://www.google-analytics.com/collect" + + params: Dict[str, Union[str, int]] = { + "an": "datahub-cli", # app name + "av": datahub_package.nice_version_name(), # app version + "t": "event", # event type + "v": GA_VERSION, # Google Analytics version + "tid": GA_TID, # tracking id + "cid": self.client_id, # client id + "ec": category, # event category + "ea": action, # event action + } + + if label: + params["el"] = label + + # this has to a non-negative int, otherwise the request will fail + if value: + params["ev"] = value + + try: + requests.post( + req_url, + data=params, + headers={ + "user-agent": f"datahub {datahub_package.nice_version_name()}" + }, + ) + except Exception as e: + + logger.debug(f"Error reporting telemetry: {e}") + + +telemetry_instance = Telemetry() + +T = TypeVar("T") + + +def with_telemetry(func: Callable[..., T]) -> Callable[..., T]: + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + res = func(*args, **kwargs) + telemetry_instance.ping(func.__module__, func.__name__) + return res + + return wrapper diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py index 9d30c13a89975..eab8e76e3e706 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py @@ -17,6 +17,8 @@ from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound from typing_extensions import ParamSpec +from datahub.ingestion.api.report import Report + logger: logging.Logger = logging.getLogger(__name__) P = ParamSpec("P") @@ -103,6 +105,27 @@ class _QueryFuture: exc: Optional[Exception] = None +def get_query_columns(query: Any) -> List[Any]: + try: + # inner_columns will be more accurate if the column names are unnamed, + # since .columns will remove the "duplicates". + inner_columns = list(query.inner_columns) + return inner_columns + except AttributeError: + return list(query.columns) + + +@dataclasses.dataclass +class SQLAlchemyQueryCombinerReport(Report): + total_queries: int = 0 + uncombined_queries_issued: int = 0 + + combined_queries_issued: int = 0 + queries_combined: int = 0 + + query_exceptions: int = 0 + + @dataclasses.dataclass class SQLAlchemyQueryCombiner: """ @@ -116,6 +139,12 @@ class SQLAlchemyQueryCombiner: is_single_row_query_method: Callable[[Any], bool] serial_execution_fallback_enabled: bool + # The Python GIL ensures that modifications to the report's counters + # are safe. + report: SQLAlchemyQueryCombinerReport = dataclasses.field( + default_factory=SQLAlchemyQueryCombinerReport + ) + # There will be one main greenlet per thread. As such, queries will be # queued according to the main greenlet's thread ID. We also keep track # of the greenlets we spawn for bookkeeping purposes. @@ -160,7 +189,7 @@ def _get_greenlet_pool( def _handle_execute( self, conn: Connection, query: Any, multiparams: Any, params: Any - ) -> Tuple[bool, Any]: + ) -> Tuple[bool, Optional[_QueryFuture]]: # Returns True with result if the query was handled, False if it # should be executed normally using the fallback method. @@ -185,15 +214,14 @@ def _handle_execute( # Figure out how many columns this query returns. # This also implicitly ensures that the typing is generally correct. - if not hasattr(query, "columns"): - return False, None - assert len(query.columns) > 0 + assert len(get_query_columns(query)) > 0 # Add query to the queue. queue = self._get_queue(main_greenlet) query_id = SQLAlchemyQueryCombiner._generate_sql_safe_identifier() query_future = _QueryFuture(conn, query, multiparams, params) queue[query_id] = query_future + self.report.queries_combined += 1 # Yield control back to the main greenlet until the query is done. # We assume that the main greenlet will be the one that actually executes the query. @@ -201,9 +229,7 @@ def _handle_execute( main_greenlet.switch() del queue[query_id] - if query_future.exc is not None: - raise query_future.exc - return True, query_future.res + return True, query_future @contextlib.contextmanager def activate(self) -> Iterator["SQLAlchemyQueryCombiner"]: @@ -211,6 +237,7 @@ def _sa_execute_fake( conn: Connection, query: Any, *args: Any, **kwargs: Any ) -> Any: try: + self.report.total_queries += 1 handled, result = self._handle_execute(conn, query, args, kwargs) except Exception as e: if not self.catch_exceptions: @@ -218,13 +245,18 @@ def _sa_execute_fake( logger.exception( f"Failed to execute query normally, using fallback: {str(query)}" ) + self.report.query_exceptions += 1 return _sa_execute_underlying_method(conn, query, *args, **kwargs) else: if handled: logger.debug(f"Query was handled: {str(query)}") - return result + assert result is not None + if result.exc is not None: + raise result.exc + return result.res else: logger.debug(f"Executing query normally: {str(query)}") + self.report.uncombined_queries_issued += 1 return _sa_execute_underlying_method(conn, query, *args, **kwargs) with _sa_execute_method_patching_lock: @@ -266,15 +298,21 @@ def _execute_queue(self, main_greenlet: greenlet.greenlet) -> None: for k, query_future in pending_queue.items() } - # TODO: determine if we need to use col.label() here. combined_cols = itertools.chain( - *[[col for col in cte.columns] for _, cte in ctes.items()] + *[ + [ + col # .label(self._generate_sql_safe_identifier()) + for col in get_query_columns(cte) + ] + for _, cte in ctes.items() + ] ) combined_query = sqlalchemy.select(combined_cols) for cte in ctes.values(): combined_query.append_from(cte) logger.debug(f"Executing combined query: {str(combined_query)}") + self.report.combined_queries_issued += 1 sa_res = _sa_execute_underlying_method(queue_item.conn, combined_query) # Fetch the results and ensure that exactly one row is returned. @@ -308,6 +346,7 @@ def _execute_queue_fallback(self, main_greenlet: greenlet.greenlet) -> None: continue logger.debug(f"Executing query via fallback: {str(query_future.query)}") + self.report.uncombined_queries_issued += 1 try: res = _sa_execute_underlying_method( query_future.conn, @@ -337,6 +376,7 @@ def flush(self) -> None: if not self.serial_execution_fallback_enabled: raise e logger.exception(f"Failed to execute queue using combiner: {str(e)}") + self.report.query_exceptions += 1 self._execute_queue_fallback(main_greenlet) for let in list(pool): diff --git a/metadata-ingestion/tests/conftest.py b/metadata-ingestion/tests/conftest.py index 51992268035e0..b85c47d078bf3 100644 --- a/metadata-ingestion/tests/conftest.py +++ b/metadata-ingestion/tests/conftest.py @@ -16,6 +16,9 @@ logging.getLogger().setLevel(logging.DEBUG) os.putenv("DATAHUB_DEBUG", "1") +# Disable telemetry +os.putenv("DATAHUB_TELEMETRY_ENABLED", "false") + @pytest.fixture def mock_time(monkeypatch): diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 53812a6bd73c6..8fa53250a9826 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -20,8 +20,13 @@ ) from datahub.ingestion.transformer.add_dataset_tags import ( AddDatasetTags, + PatternAddDatasetTags, SimpleAddDatasetTags, ) +from datahub.ingestion.transformer.add_dataset_terms import ( + PatternAddDatasetTerms, + SimpleAddDatasetTerms, +) from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus from datahub.ingestion.transformer.remove_dataset_ownership import ( SimpleRemoveDatasetOwnership, @@ -314,6 +319,41 @@ def dummy_tag_resolver_method(dataset_snapshot): return [] +def test_pattern_dataset_tags_transformation(mock_time): + dataset_mce = make_generic_dataset() + + transformer = PatternAddDatasetTags.create( + { + "tag_pattern": { + "rules": { + ".*example1.*": [ + builder.make_tag_urn("Private"), + builder.make_tag_urn("Legacy"), + ], + ".*example2.*": [builder.make_term_urn("Needs Documentation")], + } + }, + }, + PipelineContext(run_id="test-tags"), + ) + + outputs = list( + transformer.transform( + [RecordEnvelope(input, metadata={}) for input in [dataset_mce]] + ) + ) + + assert len(outputs) == 1 + # Check that glossary terms were added. + tags_aspect = builder.get_aspect_if_available( + outputs[0].record, models.GlobalTagsClass + ) + assert tags_aspect + assert len(tags_aspect.tags) == 2 + assert tags_aspect.tags[0].tag == builder.make_tag_urn("Private") + assert builder.make_tag_urn("Needs Documentation") not in tags_aspect.tags + + def test_import_resolver(): transformer = AddDatasetTags.create( { @@ -599,3 +639,67 @@ def test_add_dataset_properties(mock_time): **EXISTING_PROPERTIES, **PROPERTIES_TO_ADD, } + + +def test_simple_dataset_terms_transformation(mock_time): + dataset_mce = make_generic_dataset() + + transformer = SimpleAddDatasetTerms.create( + { + "term_urns": [ + builder.make_term_urn("Test"), + builder.make_term_urn("Needs Review"), + ] + }, + PipelineContext(run_id="test-terms"), + ) + + outputs = list( + transformer.transform( + [RecordEnvelope(input, metadata={}) for input in [dataset_mce]] + ) + ) + assert len(outputs) == 1 + + # Check that glossary terms were added. + terms_aspect = builder.get_aspect_if_available( + outputs[0].record, models.GlossaryTermsClass + ) + assert terms_aspect + assert len(terms_aspect.terms) == 2 + assert terms_aspect.terms[0].urn == builder.make_term_urn("Test") + + +def test_pattern_dataset_terms_transformation(mock_time): + dataset_mce = make_generic_dataset() + + transformer = PatternAddDatasetTerms.create( + { + "term_pattern": { + "rules": { + ".*example1.*": [ + builder.make_term_urn("AccountBalance"), + builder.make_term_urn("Email"), + ], + ".*example2.*": [builder.make_term_urn("Address")], + } + }, + }, + PipelineContext(run_id="test-terms"), + ) + + outputs = list( + transformer.transform( + [RecordEnvelope(input, metadata={}) for input in [dataset_mce]] + ) + ) + + assert len(outputs) == 1 + # Check that glossary terms were added. + terms_aspect = builder.get_aspect_if_available( + outputs[0].record, models.GlossaryTermsClass + ) + assert terms_aspect + assert len(terms_aspect.terms) == 2 + assert terms_aspect.terms[0].urn == builder.make_term_urn("AccountBalance") + assert builder.make_term_urn("AccountBalance") not in terms_aspect.terms diff --git a/metadata-ingestion/transformers.md b/metadata-ingestion/transformers.md index 015db4ac77311..5534ef15c1268 100644 --- a/metadata-ingestion/transformers.md +++ b/metadata-ingestion/transformers.md @@ -8,7 +8,7 @@ Moreover, a transformer allows one to have fine-grained control over the metadat ## Provided transformers -Aside from the option of writing your own transformer (see below), we provide some simple transformers for the use cases of adding: dataset tags, dataset properties and ownership information. +Aside from the option of writing your own transformer (see below), we provide some simple transformers for the use cases of adding: dataset tags, dataset glossary terms, dataset properties and ownership information. ### Adding a set of tags @@ -25,6 +25,24 @@ transformers: - "urn:li:tag:Legacy" ``` +### Adding tags by dataset urn pattern + +Let’s suppose we’d like to append a series of tags to specific datasets. To do so, we can use the `pattern_add_dataset_tags` module that’s included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and assign the respective tags urns given in the array. + +The config, which we’d append to our ingestion recipe YAML, would look like this: + +```yaml +transformers: + - type: "pattern_add_dataset_tags" + config: + tag_pattern: + rules: + ".*example1.*": ["urn:li:tag:NeedsDocumentation", "urn:li:tag:Legacy"] + ".*example2.*": ["urn:li:tag:NeedsDocumentation"] +``` + +### Add your own custom Transformer + If you'd like to add more complex logic for assigning tags, you can use the more generic add_dataset_tags transformer, which calls a user-provided function to determine the tags for each dataset. ```yaml @@ -70,6 +88,36 @@ def custom_tags(current: DatasetSnapshotClass) -> List[TagAssociationClass]: logging.info(f"Tagging dataset {current.urn} with {tag_strings}.") return tags ``` +Finally, you can install and use your custom transformer as [shown here](#installing-the-package). + +### Adding a set of glossary terms + +We can use a similar convention to associate [Glossary Terms](https://datahubproject.io/docs/metadata-ingestion/source_docs/business_glossary) to datasets. We can use the `simple_add_dataset_terms` module that’s included in the ingestion framework. + +The config, which we’d append to our ingestion recipe YAML, would look like this: + +```yaml +transformers: + - type: "simple_add_dataset_terms" + config: + term_urns: + - "urn:li:glossaryTerm:Email" + - "urn:li:glossaryTerm:Address" +``` + +### Adding glossary terms by dataset urn pattern + +Similar to the above example with tags, we can add glossary terms to datasets based on a regex filter. + +```yaml +transformers: + - type: "pattern_add_dataset_terms" + config: + term_pattern: + rules: + ".*example1.*": ["urn:li:glossaryTerm:Email", "urn:li:glossaryTerm:Address"] + ".*example2.*": ["urn:li:glossaryTerm:PostalCode"] +``` ### Change owners @@ -102,7 +150,7 @@ Note `ownership_type` is an optional field with `DATAOWNER` as default value. ### Setting ownership by dataset urn pattern -Let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners. +Again, let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners. The config, which we’d append to our ingestion recipe YAML, would look like this: diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/EntityHydratorConfig.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/EntityHydratorConfig.java deleted file mode 100644 index f5be9a70bb906..0000000000000 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/EntityHydratorConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.linkedin.metadata.kafka.config; - -import com.datahub.authentication.Authentication; -import com.linkedin.entity.client.RestliEntityClient; -import com.linkedin.metadata.kafka.hydrator.EntityHydrator; -import com.linkedin.metadata.restli.DefaultRestliClientFactory; -import com.linkedin.restli.client.Client; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - -@Configuration -public class EntityHydratorConfig { - - @Value("${GMS_HOST:localhost}") - private String gmsHost; - @Value("${GMS_PORT:8080}") - private int gmsPort; - @Value("${GMS_USE_SSL:false}") - private boolean gmsUseSSL; - @Value("${GMS_SSL_PROTOCOL:#{null}}") - private String gmsSslProtocol; - @Autowired - @Qualifier("systemAuthentication") - private Authentication systemAuthentication; - - @Bean - public EntityHydrator getEntityHydrator() { - Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol); - RestliEntityClient entityClient = new RestliEntityClient(restClient); - return new EntityHydrator(this.systemAuthentication, entityClient); - } -} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java index fef9487a20d9b..1d6dac13293f1 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/DataHubUsageEventsProcessor.java @@ -3,6 +3,7 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory; import com.linkedin.metadata.kafka.config.DataHubUsageEventsProcessorCondition; import com.linkedin.metadata.kafka.elasticsearch.ElasticsearchConnector; import com.linkedin.metadata.kafka.elasticsearch.JsonElasticEvent; @@ -16,6 +17,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -25,26 +27,24 @@ @Component @EnableKafka @Conditional(DataHubUsageEventsProcessorCondition.class) +@Import({SimpleKafkaConsumerFactory.class}) public class DataHubUsageEventsProcessor { private final ElasticsearchConnector elasticSearchConnector; private final DataHubUsageEventTransformer dataHubUsageEventTransformer; private final String indexName; - private final Histogram kafkaLagStats = - MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); + private final Histogram kafkaLagStats = MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); - public DataHubUsageEventsProcessor( - ElasticsearchConnector elasticSearchConnector, - DataHubUsageEventTransformer dataHubUsageEventTransformer, - IndexConvention indexConvention) { + public DataHubUsageEventsProcessor(ElasticsearchConnector elasticSearchConnector, + DataHubUsageEventTransformer dataHubUsageEventTransformer, IndexConvention indexConvention) { this.elasticSearchConnector = elasticSearchConnector; this.dataHubUsageEventTransformer = dataHubUsageEventTransformer; this.indexName = indexConvention.getIndexName("datahub_usage_event"); } @KafkaListener(id = "${DATAHUB_USAGE_EVENT_KAFKA_CONSUMER_GROUP_ID:datahub-usage-event-consumer-job-client}", topics = - "${DATAHUB_USAGE_EVENT_NAME:" + Topics.DATAHUB_USAGE_EVENT + "}", containerFactory = "stringSerializedKafkaListener") + "${DATAHUB_USAGE_EVENT_NAME:" + Topics.DATAHUB_USAGE_EVENT + "}", containerFactory = "simpleKafkaConsumer") public void consume(final ConsumerRecord consumerRecord) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); final String record = consumerRecord.value(); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java index 26b4e37067643..20e9358b451b7 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java @@ -7,6 +7,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.gms.factory.common.GraphServiceFactory; import com.linkedin.gms.factory.common.SystemMetadataServiceFactory; +import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; import com.linkedin.gms.factory.search.EntitySearchServiceFactory; import com.linkedin.gms.factory.search.SearchDocumentTransformerFactory; import com.linkedin.metadata.EventUtils; @@ -59,7 +60,7 @@ @Component @Conditional(MetadataChangeLogProcessorCondition.class) @Import({GraphServiceFactory.class, EntitySearchServiceFactory.class, SystemMetadataServiceFactory.class, - SearchDocumentTransformerFactory.class}) + SearchDocumentTransformerFactory.class, KafkaEventConsumerFactory.class}) @EnableKafka public class MetadataAuditEventsProcessor { @@ -84,7 +85,7 @@ public MetadataAuditEventsProcessor(GraphService graphService, EntitySearchServi } @KafkaListener(id = "${METADATA_AUDIT_EVENT_KAFKA_CONSUMER_GROUP_ID:mae-consumer-job-client}", topics = - "${KAFKA_TOPIC_NAME:" + Topics.METADATA_AUDIT_EVENT + "}", containerFactory = "avroSerializedKafkaListener") + "${KAFKA_TOPIC_NAME:" + Topics.METADATA_AUDIT_EVENT + "}", containerFactory = "kafkaEventConsumer") public void consume(final ConsumerRecord consumerRecord) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java index bf5de456008de..3df482d266d07 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeLogProcessor.java @@ -10,6 +10,7 @@ import com.linkedin.gms.factory.common.GraphServiceFactory; import com.linkedin.gms.factory.common.SystemMetadataServiceFactory; import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory; +import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; import com.linkedin.gms.factory.search.EntitySearchServiceFactory; import com.linkedin.gms.factory.search.SearchDocumentTransformerFactory; import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory; @@ -64,7 +65,8 @@ @Component @Conditional(MetadataChangeLogProcessorCondition.class) @Import({GraphServiceFactory.class, EntitySearchServiceFactory.class, TimeseriesAspectServiceFactory.class, - EntityRegistryFactory.class, SystemMetadataServiceFactory.class, SearchDocumentTransformerFactory.class}) + EntityRegistryFactory.class, SystemMetadataServiceFactory.class, SearchDocumentTransformerFactory.class, + KafkaEventConsumerFactory.class}) @EnableKafka public class MetadataChangeLogProcessor { @@ -94,7 +96,7 @@ public MetadataChangeLogProcessor(GraphService graphService, EntitySearchService @KafkaListener(id = "${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}", topics = { "${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}", "${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES - + "}"}, containerFactory = "avroSerializedKafkaListener") + + "}"}, containerFactory = "kafkaEventConsumer") public void consume(final ConsumerRecord consumerRecord) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); final GenericRecord record = consumerRecord.value(); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/EntityHydratorConfig.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/EntityHydratorConfig.java index 86e58ecd919c6..2d8c52566e2ae 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/EntityHydratorConfig.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/EntityHydratorConfig.java @@ -2,6 +2,7 @@ import com.datahub.authentication.Authentication; import com.linkedin.entity.client.RestliEntityClient; +import com.linkedin.gms.factory.auth.SystemAuthenticationFactory; import com.linkedin.gms.factory.entity.RestliEntityClientFactory; import com.linkedin.metadata.kafka.hydrator.EntityHydrator; import org.springframework.beans.factory.annotation.Autowired; @@ -12,7 +13,7 @@ @Configuration -@Import({RestliEntityClientFactory.class}) +@Import({RestliEntityClientFactory.class, SystemAuthenticationFactory.class}) public class EntityHydratorConfig { @Autowired diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java deleted file mode 100644 index b054f4086aaa5..0000000000000 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/config/MaeKafkaConfig.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.linkedin.metadata.kafka.config; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.time.Duration; -import java.util.Arrays; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; - - -@Slf4j -@Configuration -public class MaeKafkaConfig { - @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") - private String kafkaBootstrapServer; - @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") - private String kafkaSchemaRegistryUrl; - - @Bean(name = "avroSerializedKafkaListener") - public KafkaListenerContainerFactory avroSerializedKafkaListenerContainerFactory(KafkaProperties properties) { - return createKafkaListenerContainerFactory(properties, KafkaAvroDeserializer.class); - } - - @Bean(name = "stringSerializedKafkaListener") - public KafkaListenerContainerFactory stringSerializedKafkaListenerContainerFactory(KafkaProperties properties) { - return createKafkaListenerContainerFactory(properties, StringDeserializer.class); - } - - public KafkaListenerContainerFactory createKafkaListenerContainerFactory(KafkaProperties properties, Class valueDeserializer) { - KafkaProperties.Consumer consumerProps = properties.getConsumer(); - - // Specify (de)serializers for record keys and for record values. - consumerProps.setKeyDeserializer(StringDeserializer.class); - consumerProps.setValueDeserializer(valueDeserializer); - // Records will be flushed every 10 seconds. - consumerProps.setEnableAutoCommit(true); - consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); - - Map props = properties.buildConsumerProperties(); - - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS - if (kafkaBootstrapServer != null && kafkaBootstrapServer.length() > 0) { - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(kafkaBootstrapServer.split(","))); - } // else we rely on KafkaProperties which defaults to localhost:9092 - - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); - - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); - - log.info("KafkaListenerContainerFactory built successfully"); - - return factory; - } -} diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java index cab2cc01761c6..d15ec0e542df9 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java @@ -4,10 +4,11 @@ import com.codahale.metrics.MetricRegistry; import com.datahub.authentication.Authentication; import com.linkedin.entity.Entity; -import com.linkedin.entity.client.EntityClient; import com.linkedin.entity.client.RestliEntityClient; import com.linkedin.gms.factory.auth.SystemAuthenticationFactory; import com.linkedin.gms.factory.entity.RestliEntityClientFactory; +import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; +import com.linkedin.gms.factory.kafka.KafkaEventProducerFactory; import com.linkedin.metadata.EventUtils; import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition; import com.linkedin.metadata.snapshot.Snapshot; @@ -18,47 +19,42 @@ import com.linkedin.r2.RemoteInvocationException; import java.io.IOException; import javax.annotation.Nonnull; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Slf4j @Component @Conditional(MetadataChangeProposalProcessorCondition.class) -@Import({RestliEntityClientFactory.class, SystemAuthenticationFactory.class}) +@Import({RestliEntityClientFactory.class, SystemAuthenticationFactory.class, KafkaEventConsumerFactory.class, + KafkaEventProducerFactory.class}) @EnableKafka +@RequiredArgsConstructor public class MetadataChangeEventsProcessor { - private Authentication systemAuthentication; - private EntityClient entityClient; - private KafkaTemplate kafkaTemplate; + private final Authentication systemAuthentication; + private final RestliEntityClient entityClient; + private final Producer kafkaProducer; - private final Histogram kafkaLagStats = - MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); + private final Histogram kafkaLagStats = MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); @Value("${KAFKA_FMCE_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_EVENT + "}") private String fmceTopicName; - public MetadataChangeEventsProcessor( - @Nonnull final Authentication systemAuthentication, - @Nonnull final RestliEntityClient entityClient, - @Nonnull final KafkaTemplate kafkaTemplate) { - this.systemAuthentication = systemAuthentication; - this.entityClient = entityClient; - this.kafkaTemplate = kafkaTemplate; - } - @KafkaListener(id = "${METADATA_CHANGE_EVENT_KAFKA_CONSUMER_GROUP_ID:mce-consumer-job-client}", topics = - "${KAFKA_MCE_TOPIC_NAME:" + Topics.METADATA_CHANGE_EVENT + "}", containerFactory = "mceKafkaContainerFactory") + "${KAFKA_MCE_TOPIC_NAME:" + Topics.METADATA_CHANGE_EVENT + "}", containerFactory = "kafkaEventConsumer") public void consume(final ConsumerRecord consumerRecord) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); final GenericRecord record = consumerRecord.value(); @@ -85,7 +81,7 @@ private void sendFailedMCE(@Nonnull MetadataChangeEvent event, @Nonnull Throwabl final GenericRecord genericFailedMCERecord = EventUtils.pegasusToAvroFailedMCE(failedMetadataChangeEvent); log.debug("Sending FailedMessages to topic - {}", fmceTopicName); log.info("Error while processing MCE: FailedMetadataChangeEvent - {}", failedMetadataChangeEvent); - this.kafkaTemplate.send(fmceTopicName, genericFailedMCERecord); + kafkaProducer.send(new ProducerRecord<>(fmceTopicName, genericFailedMCERecord)); } catch (IOException e) { log.error("Error while sending FailedMetadataChangeEvent: Exception - {}, FailedMetadataChangeEvent - {}", e.getStackTrace(), failedMetadataChangeEvent); @@ -101,7 +97,8 @@ private FailedMetadataChangeEvent createFailedMCEEvent(@Nonnull MetadataChangeEv return fmce; } - private void processProposedSnapshot(@Nonnull MetadataChangeEvent metadataChangeEvent) throws RemoteInvocationException { + private void processProposedSnapshot(@Nonnull MetadataChangeEvent metadataChangeEvent) + throws RemoteInvocationException { final Snapshot snapshotUnion = metadataChangeEvent.getProposedSnapshot(); final Entity entity = new Entity().setValue(snapshotUnion); // TODO: GMS Auth Part 2: Get the actor identity from the event header itself. diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index 1a589846de9de..e46e4315bb007 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -3,10 +3,11 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.datahub.authentication.Authentication; -import com.linkedin.entity.client.EntityClient; import com.linkedin.entity.client.RestliEntityClient; import com.linkedin.gms.factory.auth.SystemAuthenticationFactory; import com.linkedin.gms.factory.entity.RestliEntityClientFactory; +import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory; +import com.linkedin.gms.factory.kafka.KafkaEventProducerFactory; import com.linkedin.metadata.EventUtils; import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition; import com.linkedin.metadata.utils.metrics.MetricUtils; @@ -15,48 +16,43 @@ import com.linkedin.mxe.Topics; import java.io.IOException; import javax.annotation.Nonnull; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Import; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Slf4j @Component -@Import({RestliEntityClientFactory.class, SystemAuthenticationFactory.class}) +@Import({RestliEntityClientFactory.class, SystemAuthenticationFactory.class, KafkaEventConsumerFactory.class, + KafkaEventProducerFactory.class}) @Conditional(MetadataChangeProposalProcessorCondition.class) @EnableKafka +@RequiredArgsConstructor public class MetadataChangeProposalsProcessor { - private Authentication systemAuthentication; // TODO: Consider whether - private EntityClient entityClient; - private KafkaTemplate kafkaTemplate; + private final Authentication systemAuthentication; + private final RestliEntityClient entityClient; + private final Producer kafkaProducer; - private final Histogram kafkaLagStats = - MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); + private final Histogram kafkaLagStats = MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); @Value("${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_PROPOSAL + "}") private String fmcpTopicName; - public MetadataChangeProposalsProcessor( - @Nonnull final Authentication systemAuthentication, - @Nonnull final RestliEntityClient entityClient, - @Nonnull final KafkaTemplate kafkaTemplate) { - this.systemAuthentication = systemAuthentication; - this.entityClient = entityClient; - this.kafkaTemplate = kafkaTemplate; - } - @KafkaListener(id = "${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}", topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL - + "}", containerFactory = "mceKafkaContainerFactory") + + "}", containerFactory = "kafkaEventConsumer") public void consume(final ConsumerRecord consumerRecord) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); final GenericRecord record = consumerRecord.value(); @@ -81,7 +77,7 @@ private void sendFailedMCP(@Nonnull MetadataChangeProposal event, @Nonnull Throw final GenericRecord genericFailedMCERecord = EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal); log.debug("Sending FailedMessages to topic - {}", fmcpTopicName); log.info("Error while processing FMCP: FailedMetadataChangeProposal - {}", failedMetadataChangeProposal); - this.kafkaTemplate.send(fmcpTopicName, genericFailedMCERecord); + kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord)); } catch (IOException e) { log.error("Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}", e.getStackTrace(), failedMetadataChangeProposal); diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java deleted file mode 100644 index 15d264dc9695f..0000000000000 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MceKafkaConfig.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.linkedin.metadata.kafka.config; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.time.Duration; -import java.util.Arrays; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.ErrorHandler; - - -@Slf4j -@Configuration -public class MceKafkaConfig { - - @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") - private String kafkaBootstrapServers; - @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") - private String kafkaSchemaRegistryUrl; - - @Bean(name = "mceKafkaContainerFactory") - public KafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaProperties properties) { - KafkaProperties.Consumer consumerProps = properties.getConsumer(); - - // Specify (de)serializers for record keys and for record values. - consumerProps.setKeyDeserializer(StringDeserializer.class); - consumerProps.setValueDeserializer(KafkaAvroDeserializer.class); - // Records will be flushed every 10 seconds. - consumerProps.setEnableAutoCommit(true); - consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); - - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS - if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { - consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); - } // else we rely on KafkaProperties which defaults to localhost:9092 - - Map props = properties.buildConsumerProperties(); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); - - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); - - log.info("KafkaListenerContainerFactory built successfully"); - - return factory; - } - - @Bean - public KafkaTemplate kafkaTemplate(KafkaProperties properties) { - KafkaProperties.Producer producerProps = properties.getProducer(); - - producerProps.setKeySerializer(StringSerializer.class); - producerProps.setValueSerializer(KafkaAvroSerializer.class); - - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS - if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { - producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); - } // else we rely on KafkaProperties which defaults to localhost:9092 - - Map props = properties.buildProducerProperties(); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); - - KafkaTemplate template = - new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); - - log.info("KafkaTemplate built successfully"); - - return template; - } - - @Bean - public ErrorHandler errorHandler() { - return (e, r) -> log.error("Exception caught during Deserialization, topic: {}, partition: {}, offset: {}", - r.topic(), r.partition(), r.offset(), e); - } -} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/AuthorizationManagerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/AuthorizationManagerFactory.java index 509a691583e48..0f11678abb3d7 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/AuthorizationManagerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/AuthorizationManagerFactory.java @@ -2,16 +2,16 @@ import com.datahub.authentication.Authentication; import com.datahub.authorization.AuthorizationManager; -import com.linkedin.entity.client.EntityClient; -import com.linkedin.gms.factory.entity.RestliEntityClientFactory; +import com.linkedin.entity.client.JavaEntityClient; import com.linkedin.entity.client.OwnershipClient; +import com.linkedin.gms.factory.entity.RestliEntityClientFactory; import com.linkedin.gms.factory.spring.YamlPropertySourceFactory; import javax.annotation.Nonnull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.PropertySource; import org.springframework.context.annotation.Scope; @@ -28,7 +28,7 @@ public class AuthorizationManagerFactory { @Autowired @Qualifier("javaEntityClient") - private EntityClient entityClient; + private JavaEntityClient entityClient; @Value("${authorizationManager.cacheRefreshIntervalSecs}") private Integer policyCacheRefreshIntervalSeconds; @@ -41,12 +41,12 @@ public class AuthorizationManagerFactory { @Nonnull protected AuthorizationManager getInstance() { - final AuthorizationManager.AuthorizationMode mode = policiesEnabled - ? AuthorizationManager.AuthorizationMode.DEFAULT + final AuthorizationManager.AuthorizationMode mode = policiesEnabled ? AuthorizationManager.AuthorizationMode.DEFAULT : AuthorizationManager.AuthorizationMode.ALLOW_ALL; final OwnershipClient ownershipClient = new OwnershipClient(entityClient); - return new AuthorizationManager(systemAuthentication, entityClient, ownershipClient, 10, policyCacheRefreshIntervalSeconds, mode); + return new AuthorizationManager(systemAuthentication, entityClient, ownershipClient, 10, + policyCacheRefreshIntervalSeconds, mode); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/RestliEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/RestliEntityClientFactory.java index 4b877984d2140..a7a911c68c2d4 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/RestliEntityClientFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/RestliEntityClientFactory.java @@ -14,16 +14,16 @@ @PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class) public class RestliEntityClientFactory { - @Value("${DATAHUB_GMS_HOST:localhost}") + @Value("${datahub.gms.host}") private String gmsHost; - @Value("${DATAHUB_GMS_PORT:8080}") + @Value("${datahub.gms.port}") private int gmsPort; - @Value("${DATAHUB_GMS_USE_SSL:false}") + @Value("${datahub.gms.useSSL}") private boolean gmsUseSSL; - @Value("${DATAHUB_GMS_SSL_PROTOCOL:#{null}}") + @Value("${datahub.gms.sslContext.protocol}") private String gmsSslProtocol; @Bean("restliEntityClient") diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java new file mode 100644 index 0000000000000..fc71197f30f06 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java @@ -0,0 +1,51 @@ +package com.linkedin.gms.factory.kafka; + +import java.time.Duration; +import java.util.Arrays; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + + +@Slf4j +@Configuration +@EnableConfigurationProperties(KafkaProperties.class) +public class SimpleKafkaConsumerFactory { + + @Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") + private String kafkaBootstrapServers; + + @Bean(name = "simpleKafkaConsumer") + protected KafkaListenerContainerFactory createInstance(KafkaProperties properties) { + + KafkaProperties.Consumer consumerProps = properties.getConsumer(); + + // Specify (de)serializers for record keys and for record values. + consumerProps.setKeyDeserializer(StringDeserializer.class); + consumerProps.setValueDeserializer(StringDeserializer.class); + // Records will be flushed every 10 seconds. + consumerProps.setEnableAutoCommit(true); + consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); + + // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS + if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { + consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); + } // else we rely on KafkaProperties which defaults to localhost:9092 + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties())); + + log.info("Simple KafkaListenerContainerFactory built successfully"); + + return factory; + } +} \ No newline at end of file diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java index d083c8bec4e43..9c84b74363966 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java @@ -21,7 +21,8 @@ * Factory for components required for any services using elasticsearch */ @Configuration -@Import({RestHighLevelClientFactory.class}) +@Import({RestHighLevelClientFactory.class, IndexConventionFactory.class, ElasticSearchBulkProcessorFactory.class, + ElasticSearchIndexBuilderFactory.class}) @PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class) public class BaseElasticSearchComponentsFactory { @Value diff --git a/metadata-service/factories/src/main/resources/application.yml b/metadata-service/factories/src/main/resources/application.yml index 9aa08b31a47c5..71c80f1a84676 100644 --- a/metadata-service/factories/src/main/resources/application.yml +++ b/metadata-service/factories/src/main/resources/application.yml @@ -24,7 +24,13 @@ authentication: # The max duration of a UI session in milliseconds. Defaults to 1 day. sessionTokenDurationMs: ${SESSION_TOKEN_DURATION_MS:86400000} - +datahub: + gms: + host: ${DATAHUB_GMS_HOST:${GMS_HOST:localhost}} + port: ${DATAHUB_GMS_PORT:${GMS_PORT:8080}} + useSSL: ${DATAHUB_GMS_USE_SSL:${GMS_USE_SSL:false}} + sslContext: + protocol: ${DATAHUB_GMS_SSL_PROTOCOL:${GMS_SSL_PROTOCOL:#{null}}} graphService: type: ${GRAPH_SERVICE_IMPL:elasticsearch} diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/configs/BaseSearchConfigWithConvention.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/configs/BaseSearchConfigWithConvention.java deleted file mode 100644 index 763279a234e58..0000000000000 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/configs/BaseSearchConfigWithConvention.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.linkedin.metadata.configs; - -import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.dao.search.BaseSearchConfig; -import com.linkedin.metadata.utils.elasticsearch.IndexConvention; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - - -/** - * {@link BaseSearchConfig} that uses {@link IndexConvention} to compute the name of the search index - * @param - */ -public abstract class BaseSearchConfigWithConvention - extends BaseSearchConfig { - - @Nullable - private final IndexConvention _indexConvention; - - protected BaseSearchConfigWithConvention() { - _indexConvention = null; - } - - protected BaseSearchConfigWithConvention(@Nullable IndexConvention indexConvention) { - _indexConvention = indexConvention; - } - - @Nonnull - @Override - public String getIndexName() { - if (_indexConvention == null) { - return super.getIndexName(); - } - return _indexConvention.getIndexName(getSearchDocument()); - } -} diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/configs/BrowseConfigFactory.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/configs/BrowseConfigFactory.java deleted file mode 100644 index a713dc3795466..0000000000000 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/configs/BrowseConfigFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.linkedin.metadata.configs; - -import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.dao.browse.BaseBrowseConfig; -import com.linkedin.metadata.utils.elasticsearch.IndexConvention; -import javax.annotation.Nonnull; - - -public class BrowseConfigFactory { - private BrowseConfigFactory() { - } - - public static BaseBrowseConfig getBrowseConfig(Class clazz, - IndexConvention indexConvention) { - return new BaseBrowseConfig() { - @Override - public Class getSearchDocument() { - return clazz; - } - - @Nonnull - @Override - public String getIndexName() { - return indexConvention.getIndexName(clazz); - } - }; - } -} diff --git a/smoke-test/tests/conftest.py b/smoke-test/tests/conftest.py index ff8ebe6de187f..7d0f3c159864b 100644 --- a/smoke-test/tests/conftest.py +++ b/smoke-test/tests/conftest.py @@ -1,3 +1,4 @@ +import os import time import pytest @@ -7,6 +8,9 @@ from datahub.ingestion.run.pipeline import Pipeline from tests.utils import FRONTEND_ENDPOINT +# Disable telemetry +os.putenv("DATAHUB_TELEMETRY_ENABLED", "false") + @pytest.fixture(scope="session") def wait_for_healthchecks(): # Simply assert that everything is healthy, but don't wait.